From 7621132097a90149cdda55991ba7efb73dc44beb Mon Sep 17 00:00:00 2001 From: Jakob Andersen Date: Mon, 5 Feb 2018 23:17:16 +0100 Subject: [PATCH] Authenticated cloud-to-prod interop tests. (#55) Added authentication provider classes, and wired up the auth interop tests. Refactored connection logic to throw initial connection errors early. Fixes #53 --- CHANGELOG.md | 7 ++ example/googleapis/bin/logging.dart | 57 ---------- example/googleapis/pubspec.yaml | 1 - interop/lib/src/client.dart | 162 +++++++++++++++++++++++++-- lib/grpc.dart | 2 + lib/src/auth/auth.dart | 164 ++++++++++++++++++++++++++++ lib/src/client/call.dart | 22 +++- lib/src/client/connection.dart | 43 +++++--- lib/src/client/options.dart | 10 +- pubspec.yaml | 3 +- test/client_test.dart | 24 ++-- 11 files changed, 390 insertions(+), 105 deletions(-) create mode 100644 lib/src/auth/auth.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index a3dfd51f..cdc84548 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.3.0 - 2018-02-05 + +* Added authentication metadata providers, optimized for use with Google Cloud. +* Added service URI to metadata provider API, needed for Json Web Token generation. +* Added authenticated cloud-to-prod interoperability tests. +* Refactored connection logic to throw initial connection errors early. + ## 0.2.1 - 2018-01-18 * Updated generated code in examples using latest protoc compiler plugin. diff --git a/example/googleapis/bin/logging.dart b/example/googleapis/bin/logging.dart index 2c00058b..4841b674 100644 --- a/example/googleapis/bin/logging.dart +++ b/example/googleapis/bin/logging.dart @@ -14,72 +14,15 @@ // limitations under the License. import 'dart:async'; -import 'dart:convert'; import 'dart:io'; -import 'package:googleapis_auth/auth_io.dart' as auth; import 'package:grpc/grpc.dart'; -import 'package:http/http.dart' as http; import 'package:googleapis/src/generated/google/api/monitored_resource.pb.dart'; import 'package:googleapis/src/generated/google/logging/type/log_severity.pb.dart'; import 'package:googleapis/src/generated/google/logging/v2/log_entry.pb.dart'; import 'package:googleapis/src/generated/google/logging/v2/logging.pbgrpc.dart'; -const _tokenExpirationThreshold = const Duration(seconds: 30); - -class ServiceAccountAuthenticator { - auth.ServiceAccountCredentials _serviceAccountCredentials; - final List _scopes; - String _projectId; - - auth.AccessToken _accessToken; - Future _call; - - ServiceAccountAuthenticator(String serviceAccountJson, this._scopes) { - final serviceAccount = JSON.decode(serviceAccountJson); - _serviceAccountCredentials = - new auth.ServiceAccountCredentials.fromJson(serviceAccount); - _projectId = serviceAccount['project_id']; - } - - String get projectId => _projectId; - - Future authenticate(Map metadata) async { - if (_accessToken == null || _accessToken.hasExpired) { - await _obtainAccessCredentials(); - } - - metadata['authorization'] = 'Bearer ${_accessToken.data}'; - - if (_tokenExpiresSoon) { - // Token is about to expire. Extend it prematurely. - _obtainAccessCredentials().catchError((_) {}); - } - } - - bool get _tokenExpiresSoon => _accessToken.expiry - .subtract(_tokenExpirationThreshold) - .isBefore(new DateTime.now().toUtc()); - - Future _obtainAccessCredentials() { - if (_call == null) { - final authClient = new http.Client(); - _call = auth - .obtainAccessCredentialsViaServiceAccount( - _serviceAccountCredentials, _scopes, authClient) - .then((credentials) { - _accessToken = credentials.accessToken; - _call = null; - authClient.close(); - }); - } - return _call; - } - - CallOptions get toCallOptions => new CallOptions(providers: [authenticate]); -} - Future main() async { final serviceAccountFile = new File('logging-service-account.json'); if (!serviceAccountFile.existsSync()) { diff --git a/example/googleapis/pubspec.yaml b/example/googleapis/pubspec.yaml index 11c3cfc8..b62fef2e 100644 --- a/example/googleapis/pubspec.yaml +++ b/example/googleapis/pubspec.yaml @@ -8,7 +8,6 @@ environment: dependencies: async: ^1.13.3 - googleapis_auth: ^0.2.3+6 grpc: path: ../../ protobuf: ^0.7.0 diff --git a/interop/lib/src/client.dart b/interop/lib/src/client.dart index bd072f17..5c891220 100644 --- a/interop/lib/src/client.dart +++ b/interop/lib/src/client.dart @@ -19,7 +19,6 @@ import 'dart:typed_data'; import 'package:collection/collection.dart'; import 'package:grpc/grpc.dart'; - import 'package:interop/src/generated/empty.pb.dart'; import 'package:interop/src/generated/messages.pb.dart'; import 'package:interop/src/generated/test.pbgrpc.dart'; @@ -40,6 +39,17 @@ class Tester { String defaultServiceAccount; String oauthScope; String serviceAccountKeyFile; + String _serviceAccountJson; + + String get serviceAccountJson => + _serviceAccountJson ??= _readServiceAccountJson(); + + String _readServiceAccountJson() { + if (serviceAccountKeyFile?.isEmpty ?? true) { + throw 'Service account key file not specified.'; + } + return new File(serviceAccountKeyFile).readAsStringSync(); + } void set serverPort(String value) { if (value == null) { @@ -61,6 +71,7 @@ class Tester { _useTestCA = value == 'true'; } + ClientChannel channel; TestServiceClient client; UnimplementedServiceClient unimplementedServiceClient; @@ -90,7 +101,7 @@ class Tester { options = new ChannelOptions.insecure(); } - final channel = + channel = new ClientChannel(serverHost, port: _serverPort, options: options); client = new TestServiceClient(channel); unimplementedServiceClient = new UnimplementedServiceClient(channel); @@ -124,6 +135,8 @@ class Tester { return emptyStream(); case 'compute_engine_creds': return computeEngineCreds(); + case 'service_account_creds': + return serviceAccountCreds(); case 'jwt_token_creds': return jwtTokenCreds(); case 'oauth2_auth_token': @@ -458,7 +471,8 @@ class Tester { responses.map((response) => response.payload.body.length).toList(); if (!new ListEquality().equals(responseLengths, expectedResponses)) { - throw 'Incorrect response lengths received (${responseLengths.join(', ')} != ${expectedResponses.join(', ')})'; + throw 'Incorrect response lengths received (${responseLengths.join( + ', ')} != ${expectedResponses.join(', ')})'; } } @@ -571,7 +585,8 @@ class Tester { requests.add(index); await for (final response in responses) { if (index >= expectedResponses.length) { - throw 'Received too many responses. $index > ${expectedResponses.length}.'; + throw 'Received too many responses. $index > ${expectedResponses + .length}.'; } if (response.payload.body.length != expectedResponses[index]) { throw 'Response mismatch for response $index: ' @@ -638,6 +653,62 @@ class Tester { /// * clients are free to assert that the response payload body contents are /// zero and comparing the entire response message against a golden response Future computeEngineCreds() async { + final credentials = new ComputeEngineAuthenticator(); + final clientWithCredentials = + new TestServiceClient(channel, options: credentials.toCallOptions); + + final response = await _sendSimpleRequestForAuth(clientWithCredentials, + fillUsername: true, fillOauthScope: true); + + final user = response.username; + final oauth = response.oauthScope; + + if (user?.isEmpty ?? true) { + throw 'Username not received.'; + } + if (oauth?.isEmpty ?? true) { + throw 'OAuth scope not received.'; + } + + if (!serviceAccountJson.contains(user)) { + throw 'Got user name $user, which is not a substring of $serviceAccountJson'; + } + if (!oauthScope.contains(oauth)) { + throw 'Got OAuth scope $oauth, which is not a substring of $oauthScope'; + } + } + + /// This test is only for cloud-to-prod path. + /// + /// This test verifies unary calls succeed in sending messages while using + /// service account credentials. + /// + /// Test caller should set flag `--service_account_key_file` with the path to + /// json key file downloaded from https://console.developers.google.com. + /// Alternately, if using a usable auth implementation, she may specify the + /// file location in the environment variable GOOGLE_APPLICATION_CREDENTIALS. + /// + /// Procedure: + /// 1. Client configures the channel to use ServiceAccountCredentials + /// 2. Client calls UnaryCall with: + /// { + /// response_size: 314159 + /// payload: { + /// body: 271828 bytes of zeros + /// } + /// fill_username: true + /// } + /// + /// Client asserts: + /// * call was successful + /// * received SimpleResponse.username is not empty and is in the json key + /// file used by the auth library. The client can optionally check the + /// username matches the email address in the key file or equals the value + /// of `--default_service_account` flag. + /// * response payload body is 314159 bytes in size + /// * clients are free to assert that the response payload body contents are + /// zero and comparing the entire response message against a golden response + Future serviceAccountCreds() async { throw 'Not implemented'; } @@ -672,7 +743,19 @@ class Tester { /// * clients are free to assert that the response payload body contents are /// zero and comparing the entire response message against a golden response Future jwtTokenCreds() async { - throw 'Not implemented'; + final credentials = new JwtServiceAccountAuthenticator(serviceAccountJson); + final clientWithCredentials = + new TestServiceClient(channel, options: credentials.toCallOptions); + + final response = await _sendSimpleRequestForAuth(clientWithCredentials, + fillUsername: true); + final username = response.username; + if (username?.isEmpty ?? true) { + throw 'Username not received.'; + } + if (!serviceAccountJson.contains(username)) { + throw 'Got user name $username, which is not a substring of $serviceAccountJson'; + } } /// This test is only for cloud-to-prod path and some implementations may run @@ -715,7 +798,30 @@ class Tester { /// check against the json key file or GCE default service account email. /// * received SimpleResponse.oauth_scope is in `--oauth_scope` Future oauth2AuthToken() async { - throw 'Not implemented'; + final credentials = + new ServiceAccountAuthenticator(serviceAccountJson, [oauthScope]); + final clientWithCredentials = + new TestServiceClient(channel, options: credentials.toCallOptions); + + final response = await _sendSimpleRequestForAuth(clientWithCredentials, + fillUsername: true, fillOauthScope: true); + + final user = response.username; + final oauth = response.oauthScope; + + if (user?.isEmpty ?? true) { + throw 'Username not received.'; + } + if (oauth?.isEmpty ?? true) { + throw 'OAuth scope not received.'; + } + + if (!serviceAccountJson.contains(user)) { + throw 'Got user name $user, which is not a substring of $serviceAccountJson'; + } + if (!oauthScope.contains(oauth)) { + throw 'Got OAuth scope $oauth, which is not a substring of $oauthScope'; + } } /// Similar to the other auth tests, this test is only for cloud-to-prod path. @@ -747,7 +853,49 @@ class Tester { /// file used by the auth library. The client can optionally check the /// username matches the email address in the key file. Future perRpcCreds() async { - throw 'Not implemented'; + final credentials = + new ServiceAccountAuthenticator(serviceAccountJson, [oauthScope]); + + final response = await _sendSimpleRequestForAuth(client, + fillUsername: true, + fillOauthScope: true, + options: credentials.toCallOptions); + + final user = response.username; + final oauth = response.oauthScope; + + if (user?.isEmpty ?? true) { + throw 'Username not received.'; + } + if (oauth?.isEmpty ?? true) { + throw 'OAuth scope not received.'; + } + + if (!serviceAccountJson.contains(user)) { + throw 'Got user name $user, which is not a substring of $serviceAccountJson'; + } + if (!oauthScope.contains(oauth)) { + throw 'Got OAuth scope $oauth, which is not a substring of $oauthScope'; + } + } + + Future _sendSimpleRequestForAuth(TestServiceClient client, + {bool fillUsername: false, + bool fillOauthScope: false, + CallOptions options}) async { + final payload = new Payload()..body = new Uint8List(271828); + final request = new SimpleRequest() + ..responseSize = 314159 + ..payload = payload + ..fillUsername = fillUsername + ..fillOauthScope = fillOauthScope; + final response = await client.unaryCall(request, options: options); + final receivedBytes = response.payload.body.length; + if (receivedBytes != 314159) { + throw 'Response payload mismatch. Expected 314159 bytes, ' + 'got ${receivedBytes}.'; + } + return response; } /// This test verifies that custom metadata in either binary or ascii format diff --git a/lib/grpc.dart b/lib/grpc.dart index 232d972b..afb014a3 100644 --- a/lib/grpc.dart +++ b/lib/grpc.dart @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +export 'src/auth/auth.dart'; + export 'src/client/call.dart'; export 'src/client/channel.dart'; export 'src/client/client.dart'; diff --git a/lib/src/auth/auth.dart b/lib/src/auth/auth.dart new file mode 100644 index 00000000..fa53ac5c --- /dev/null +++ b/lib/src/auth/auth.dart @@ -0,0 +1,164 @@ +// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; +import 'dart:convert'; + +import 'package:googleapis_auth/auth_io.dart' as auth; +import 'package:googleapis_auth/src/crypto/rsa_sign.dart'; +import 'package:grpc/src/shared/status.dart'; +import 'package:http/http.dart' as http; + +import '../client/options.dart'; + +const _tokenExpirationThreshold = const Duration(seconds: 30); + +abstract class BaseAuthenticator { + auth.AccessToken _accessToken; + String _lastUri; + + Future authenticate(Map metadata, String uri) async { + if (uri == null) { + throw new GrpcError.unauthenticated( + 'Credentials require secure transport.'); + } + if (_accessToken == null || _accessToken.hasExpired || uri != _lastUri) { + await obtainAccessCredentials(uri); + _lastUri = uri; + } + + final auth = '${_accessToken.type} ${_accessToken.data}'; + metadata['authorization'] = auth; + + if (_tokenExpiresSoon) { + // Token is about to expire. Extend it prematurely. + obtainAccessCredentials(_lastUri).catchError((_) {}); + } + } + + bool get _tokenExpiresSoon => _accessToken.expiry + .subtract(_tokenExpirationThreshold) + .isBefore(new DateTime.now().toUtc()); + + CallOptions get toCallOptions => new CallOptions(providers: [authenticate]); + + Future obtainAccessCredentials(String uri); +} + +abstract class HttpBasedAuthenticator extends BaseAuthenticator { + Future _call; + + Future obtainAccessCredentials(String uri) { + if (_call == null) { + final authClient = new http.Client(); + _call = obtainCredentialsWithClient(authClient, uri).then((credentials) { + _accessToken = credentials.accessToken; + _call = null; + authClient.close(); + }); + } + return _call; + } + + Future obtainCredentialsWithClient(http.Client client, String uri); +} + +class ComputeEngineAuthenticator extends HttpBasedAuthenticator { + Future obtainCredentialsWithClient(http.Client client, String uri) => + auth.obtainAccessCredentialsViaMetadataServer(client); +} + +class ServiceAccountAuthenticator extends HttpBasedAuthenticator { + auth.ServiceAccountCredentials _serviceAccountCredentials; + final List _scopes; + String _projectId; + + ServiceAccountAuthenticator(String serviceAccountJson, this._scopes) { + final serviceAccount = JSON.decode(serviceAccountJson); + _serviceAccountCredentials = + new auth.ServiceAccountCredentials.fromJson(serviceAccount); + _projectId = serviceAccount['project_id']; + } + + String get projectId => _projectId; + + Future obtainCredentialsWithClient(http.Client client, String uri) => + auth.obtainAccessCredentialsViaServiceAccount( + _serviceAccountCredentials, _scopes, client); +} + +class JwtServiceAccountAuthenticator extends BaseAuthenticator { + auth.ServiceAccountCredentials _serviceAccountCredentials; + String _projectId; + String _keyId; + + JwtServiceAccountAuthenticator(String serviceAccountJson) { + final serviceAccount = JSON.decode(serviceAccountJson); + _serviceAccountCredentials = + new auth.ServiceAccountCredentials.fromJson(serviceAccount); + _projectId = serviceAccount['project_id']; + _keyId = serviceAccount['private_key_id']; + } + + String get projectId => _projectId; + + Future obtainAccessCredentials(String uri) async { + _accessToken = _jwtTokenFor(_serviceAccountCredentials, _keyId, uri); + } +} + +// TODO(jakobr): Expose in googleapis_auth. +auth.AccessToken _jwtTokenFor( + auth.ServiceAccountCredentials credentials, String keyId, String uri, + {String user, List scopes}) { + // Subtracting 20 seconds from current timestamp to allow for clock skew among + // servers. + final timestamp = + (new DateTime.now().toUtc().millisecondsSinceEpoch ~/ 1000) - 20; + final expiry = timestamp + 3600; + + final header = {'alg': 'RS256', 'typ': 'JWT'}; + if (keyId != null) { + header['kid'] = keyId; + } + + final claims = { + 'iss': credentials.email, + 'aud': uri, + 'exp': expiry, + 'iat': timestamp, + 'sub': user ?? credentials.email + }; + if (scopes != null) { + claims['scope'] = scopes.join(' '); + } + + final headerBase64 = _base64url(ASCII.encode(JSON.encode(header))); + final claimsBase64 = _base64url(UTF8.encode(JSON.encode(claims))); + + final data = '$headerBase64.$claimsBase64'; + + final signer = new RS256Signer(credentials.privateRSAKey); + final signature = signer.sign(ASCII.encode(data)); + + final jwt = '$data.${_base64url(signature)}'; + + return new auth.AccessToken('Bearer', jwt, + new DateTime.fromMillisecondsSinceEpoch(expiry * 1000, isUtc: true)); +} + +String _base64url(List bytes) { + return BASE64URL.encode(bytes).replaceAll('=', ''); +} diff --git a/lib/src/client/call.dart b/lib/src/client/call.dart index ac21026f..cb5a4d29 100644 --- a/lib/src/client/call.dart +++ b/lib/src/client/call.dart @@ -76,10 +76,10 @@ class ClientCall implements Response { static Map _sanitizeMetadata(Map metadata) { final sanitizedMetadata = {}; metadata.forEach((String key, String value) { - final lowerCaseKey = key.toLowerCase(); + final lowerCaseKey = key.trim().toLowerCase(); if (!lowerCaseKey.startsWith(':') && !_reservedHeaders.contains(lowerCaseKey)) { - sanitizedMetadata[lowerCaseKey] = value; + sanitizedMetadata[lowerCaseKey] = value.trim(); } }); return sanitizedMetadata; @@ -92,8 +92,17 @@ class ClientCall implements Response { _sendRequest(connection, _sanitizeMetadata(options.metadata)); } else { final metadata = new Map.from(options.metadata); + String audience; + if (connection.options.isSecure) { + final port = connection.port != 443 ? ':${connection.port}' : ''; + final lastSlashPos = path.lastIndexOf('/'); + final audiencePath = + lastSlashPos == -1 ? path : path.substring(0, lastSlashPos); + audience = 'https://${connection.authority}$port$audiencePath'; + } Future - .forEach(options.metadataProviders, (provider) => provider(metadata)) + .forEach(options.metadataProviders, + (provider) => provider(metadata, audience)) .then((_) => _sendRequest(connection, _sanitizeMetadata(metadata))) .catchError(_onMetadataProviderError); } @@ -104,7 +113,12 @@ class ClientCall implements Response { } void _sendRequest(ClientConnection connection, Map metadata) { - _stream = connection.makeRequest(path, options.timeout, metadata); + try { + _stream = connection.makeRequest(path, options.timeout, metadata); + } catch (e) { + _terminateWithError(new GrpcError.unavailable('Error making call: $e')); + return; + } _requestSubscription = _requests .map(_method.requestSerializer) .map(GrpcHttpEncoder.frame) diff --git a/lib/src/client/connection.dart b/lib/src/client/connection.dart index bcbb395b..f92df0a3 100644 --- a/lib/src/client/connection.dart +++ b/lib/src/client/connection.dart @@ -133,7 +133,7 @@ class ClientConnection { _setState(ConnectionState.ready); _pendingCalls.forEach(_startCall); _pendingCalls.clear(); - }).catchError(_handleTransientFailure); + }).catchError(_handleConnectionFailure); } void dispatchCall(ClientCall call) { @@ -164,10 +164,13 @@ class ClientConnection { call.onConnectionReady(this); } - void _shutdownCall(ClientCall call) { + void _failCall(ClientCall call, dynamic error) { if (call.isCancelled) return; - call.onConnectionError( - new GrpcError.unavailable('Connection shutting down.')); + call.onConnectionError(error); + } + + void _shutdownCall(ClientCall call) { + _failCall(call, 'Connection shutting down.'); } /// Shuts down this connection. @@ -232,20 +235,16 @@ class ClientConnection { return _pendingCalls.isNotEmpty; } - void _handleTransientFailure(error) { + void _handleConnectionFailure(error) { _transport = null; if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) { return; } // TODO(jakobr): Log error. _cancelTimer(); - if (!_hasPendingCalls()) { - _setState(ConnectionState.idle); - return; - } - _setState(ConnectionState.transientFailure); - _currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay); - _timer = new Timer(_currentReconnectDelay, _handleReconnect); + _pendingCalls.forEach((call) => _failCall(call, error)); + _pendingCalls.clear(); + _setState(ConnectionState.idle); } void _handleReconnect() { @@ -256,9 +255,23 @@ class ClientConnection { void _handleSocketClosed(_) { _cancelTimer(); - if (_state != ConnectionState.idle && _state != ConnectionState.shutdown) { - // We were not planning to close the socket. - _handleTransientFailure('Socket closed'); + _transport = null; + + if (_state == ConnectionState.idle && _state == ConnectionState.shutdown) { + // All good. + return; + } + + // We were not planning to close the socket. + if (!_hasPendingCalls()) { + // No pending calls. Just hop to idle, and wait for a new RPC. + _setState(ConnectionState.idle); + return; } + + // We have pending RPCs. Reconnect after backoff delay. + _setState(ConnectionState.transientFailure); + _currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay); + _timer = new Timer(_currentReconnectDelay, _handleReconnect); } } diff --git a/lib/src/client/options.dart b/lib/src/client/options.dart index 0b7ef2e5..a4600b64 100644 --- a/lib/src/client/options.dart +++ b/lib/src/client/options.dart @@ -98,10 +98,12 @@ class ChannelOptions { /// metadata to the RPC. If the function returns a [Future], the RPC will await /// completion of the returned [Future] before transmitting the request. /// -/// The metadata provider is given the current metadata map (possibly modified -/// by previous metadata providers), and is expected to modify the map before -/// returning or before completing the returned [Future]. -typedef FutureOr MetadataProvider(Map metadata); +/// The metadata provider is given the current [metadata] map (possibly modified +/// by previous metadata providers) and the [uri] that is being called, and is +/// expected to modify the map before returning or before completing the +/// returned [Future]. +typedef FutureOr MetadataProvider( + Map metadata, String uri); /// Runtime options for an RPC. class CallOptions { diff --git a/pubspec.yaml b/pubspec.yaml index a0f68373..3d0ff4ca 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: grpc description: Dart implementation of gRPC. -version: 0.2.1 +version: 0.3.0 author: Dart Team homepage: https://github.com/dart-lang/grpc-dart @@ -9,6 +9,7 @@ environment: dependencies: async: ^1.13.3 + googleapis_auth: ^0.2.3+6 meta: ^1.0.5 http2: ^0.1.7 diff --git a/test/client_test.dart b/test/client_test.dart index b1455df9..eed6e129 100644 --- a/test/client_test.dart +++ b/test/client_test.dart @@ -328,30 +328,22 @@ void main() { ); } - test('Reconnect on connection error', () async { + test('Connection errors are reported', () async { final connectionStates = []; harness.connection.connectionError = 'Connection error'; - int failureCount = 0; harness.connection.onStateChanged = (connection) { final state = connection.state; connectionStates.add(state); - if (state == ConnectionState.transientFailure) failureCount++; - if (failureCount == 2) { - harness.connection.connectionError = null; - } }; - await makeUnaryCall(); + final expectedException = + new GrpcError.unavailable('Error connecting: Connection error'); - expect(failureCount, 2); - expect(connectionStates, [ - ConnectionState.connecting, - ConnectionState.transientFailure, - ConnectionState.connecting, - ConnectionState.transientFailure, - ConnectionState.connecting, - ConnectionState.ready - ]); + await harness.expectThrows( + harness.client.unary(dummyValue), expectedException); + + expect( + connectionStates, [ConnectionState.connecting, ConnectionState.idle]); }); test('Connections time out if idle', () async {