Dec 29, 2018

As you all are aware that Google Plus is shutting down in March 2019 and so are all its services. I have had a legacy android app on play store that was using the GoogleApiClient for authentication with Google Plus services, alas, I had to upgrade the application to use the new GoogleSignInClient. And, I am glad that I did so for the following reasons:
  • GoogleSignInClient API is based on Task Api
  • It does not involve managing connection as with GoogleApiClient API's, so no callback hell and boilerplate code for managing connection state.
  • You can get other information such as user's first name, last name, and email, directly from the result.
So how does it look.
The initialization is pretty much the similar to the earlier post.
  • Initialize the GoogleSignInClient in onCreate method.
  • Initialize the GoogleSignInOptions with Profile scope, which gives you basic profile information as before. You can request for email via requestEmail on the builder.
  • As before, you can request the authorization token to perform the request on behalf of the user from your backend server, this is available as server_auth_code on the response.
  • The client verification token is available in client_token on the response.
    @Override
     protected void onCreate(Bundle savedInstanceState) {
       mActivity = this;
       GoogleSignInOptions gso = new GoogleSignInOptions.Builder(GoogleSignInOptions.DEFAULT_SIGN_IN)
           .requestIdToken(serverToken).requestServerAuthCode(serverToken).
               requestEmail().
               requestScopes(new Scope(Scopes.PROFILE))
           .build();
       mGoogleSignInClient = GoogleSignIn.getClient(this, gso);
    
     }
    
  • The fresh sign in method is mentioned below, note that you can attempt silentSignIn if user has already signed in your app before. This will be shown later.
    private void signIn() {
      Intent intent = mGoogleSignInClient.getSignInIntent();
      if (!mIntentInProgress) {
        mIntentInProgress = true;
        showProgressDialog();
        startActivityForResult(intent, RC_SIGN_IN);
       }
      }
    
  • Handling the result. We just check for the request code and call the Task getSignedInAccountFromIntent method with the intent data.
    After we obtain the task, we can just call the addOnCompleteListener method of the Task API, to check for success or failure and maybe retry on failure?.
    • In case of Success, we refresh the UI to show that the user has logged in and also store the client_tokenand server_auth_codegetProfileInfo method just extracts the relevant profile information from the user.
    • In case of failure, which can be caused, if we attempted silentSignIn, and it failed with SIGN_IN_REQUIREDerror code, we retry with fresh signin. Finally, If the request fails with SIGN_IN_FAILED, we cannot use that account for sign in and inform the user of the same.
       @Override
        protected void onActivityResult(int requestCode, int responseCode,
            Intent data) {
          mIntentInProgress = false;
         if (requestCode == RC_SIGN_IN) {
            hideProgressDialog();
            Task<GoogleSignInAccount> task =
                GoogleSignIn.getSignedInAccountFromIntent(data);
            handleSignInResult(task, false);
          }
        }
      
        private void handleSignInResult(Task<GoogleSignInAccount> task, final boolean silent) {
            task.addOnCompleteListener(new OnCompleteListener<GoogleSignInAccount>() {
              @Override
              public void onComplete(@NonNull Task<GoogleSignInAccount> task) {
                if (task.isSuccessful()) {
                  GoogleSignInAccount result = task.getResult();
                  isSignedIn = true;
                  invalidateOptionsMenu();
                  SharedPreferences.Editor editor = preferences.edit();
                  editor.putString("client_token", result.getIdToken());
                  editor.putString("server_auth_code", result.getServerAuthCode());
                  editor.apply();
                  getProfileInfo(result);
                } else {
                  Exception e = task.getException();
                  if (e instanceof ApiException) {
                    ApiException apiEx = (ApiException) e;
                    if (silent && apiEx.getStatusCode() == GoogleSignInStatusCodes.SIGN_IN_REQUIRED) {
                      signIn();
                    }
                    if (apiEx.getStatusCode() == GoogleSignInStatusCodes.SIGN_IN_FAILED) {
                      FeedReaderApplication
                          .showSnackOrToast(findViewById(R.id.main_parent_view), R.string.sign_in_failed,
                              true);
                    }
                  }
                }
              }
            });
          }
      
  • Silent Sign In: If the user has already signed in earlier, and we have obtained the tokens, instead of starting the sign in flow, we can attempt silentSignIn. We already handle the failures in handleSignInResult method. In addition, silentSignIn should be our default and should be called in onResume method of activity. Before calling silentSignIn, we can check whether user is connected to the internet.
     private void attemptSilentSignIn() {
        Task<GoogleSignInAccount> task = mGoogleSignInClient.silentSignIn();
        handleSignInResult(task, true);
     }
    
     private void signInUsingNewAPI() {
      ConnectionChecker checker = new ConnectionChecker(FeedReaderApplication.getAppContext());
      boolean isConnected = checker.isConnectedToInternet();
      if (!isSignedIn && isConnected) {
        attemptSilentSignIn();
      }
     }
    
  • Signing Out:
     public void signOutFromGoogle() {
        mGoogleSignInClient.signOut();
        isSignedIn = false;
      }
    
  • Revoking Access:
     public void revokeGoogleAccess() {
        mGoogleSignInClient.revokeAccess();
        isSignedIn = false;
      }
    
  • Finally, A helper method is shown below for extracting user profile information from GoogleSignInAccount
    public void getProfileInfo(GoogleSignInAccount account) {
       String personName = account.getDisplayName();
       String firstName = account.getDisplayName();
       String lastName = account.getDisplayName();
       Uri photoUrl = account.getPhotoUrl();
       String email = account.getEmail();
     }
    

Conclusions

The Task API and GoogleSignInClient makes it a lot easier to manage sign in process and flow. You don't have to take my word for it, just look at the earlier post.

Posted on Saturday, December 29, 2018 by Unknown

Dec 21, 2018

In this post, I will cover a tutorial that involves different moving pieces. It covers the following:
  • Java WatchService
  • Spring Boot
  • Initialization-on-demand holder idiom
  • Managing concurrency
  • RXJava
  • Lombok (because why type more?)
The example will expose a Spring Boot REST service that exposes csv file records from a directory. In addition, there is a WatchService that monitors the directory for changes, specifically only creation and removal of CSV files.

Let's start with the pieces

  • We want to access the records for a CSV file. So, the first thing we need to do is either search the directory for the csv file or maintain a in memory lookup containing the path of a file. If choosing a in memory lookup, lookups need to be fast, so a HashMap like structure with Map<String,Path> should suffice.
  • But, we need to somehow update the entries in that map after they are added or deleted and this needs to happen concurrently from the watch service, so a better data structure would be a ConcurrentHashMap<String,Path>
  • WatchService needs to run in the background periodically. A RxJava interval stream with Schedulers.io() should suffice.
 Flowable.interval(5, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).forEach(//do something)
  • Initialization-on-demand holder idiom : This is used to for safely creating a singleton instance of our map. This happens because of lazy and sequential guarantees of class initialization.
//Lazy holder idiom for initializing the singleton map
private static class CSVMapping {

  private static final ResourceBundle rb = ResourceBundle.getBundle("app");
  //safe creation
  private static final Map<String, Path> CSVMap = getCSVMap();
  • Reading CSV using RXJava: I covered this in an earlier post.
  • Creating a Spring Boot REST Controller: We just create a REST controller for Spring Boot that checks whether the file name in the GET request exists or not, if it exists we just need to collect the CSV records and return them as Response, or return 404 error.

Putting it all together:

The code below shows the entire service.
@Slf4j
@RestController
public class CSVFileWatcher {

  @GetMapping("/getcsv/{fileName}")
  public ResponseEntity<List<Iterator<String>>> readCSVFile(
      @PathVariable("fileName") String fileName) {
    val dirMap = CSVMapping.CSVMap;
    if (dirMap.containsKey(fileName)) {
      return new ResponseEntity<>(
          CSVUtil.readRecordsFromFile(dirMap.get(fileName), CSVFormat.DEFAULT)
              .map(r -> r.iterator())
              .subscribeOn(Schedulers.io()).toList()
              .blockingGet(),
          HttpStatus.OK);
    } else {
      return new ResponseEntity<>(HttpStatus.NOT_FOUND);
    }
  }

  //Lazy holder idiom for initializing the singleton map
  private static class CSVMapping {

    private static final ResourceBundle rb = ResourceBundle.getBundle("app");
    private static final Map<String, Path> CSVMap = getCSVMap();

    private static boolean isCSV(Path path) {
      try {
        val contentType = Files.probeContentType(path);
        if (contentType != null && contentType.equals("text/csv")) {
          return true;
        }
      } catch (IOException e) {
        log.error("Unable to probe content type", e);
      }
      return false;
    }

    private static void registerFileWatcher(Path path, Map<String, Path> map) {
      try {
        val watchService = FileSystems.getDefault().newWatchService();
        path.register(watchService, ENTRY_CREATE, ENTRY_DELETE);
        Flowable.interval(5, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).forEach(t -> {
          val key = watchService.poll();
          if (key != null) {
            for (WatchEvent event : key.pollEvents()) {
              val kind = event.kind();
              if (kind == OVERFLOW) {
                continue;
              }
              val fileEvent = (WatchEvent<Path>) event;
              Path dir = (Path) key.watchable();
              Path fullPath = dir.resolve(fileEvent.context());
              if (kind == ENTRY_CREATE && isCSV(fullPath)) {
                log.debug("New CSV file detected {}", fullPath.toString());
                map.put(fullPath.getFileName().toString(),
                    fullPath);
              } else if (kind == ENTRY_DELETE && isCSV(fullPath)) {
                log.debug("CSV file {} deleted", fullPath.toString());
                map.remove(fullPath.getFileName().toString());
              }
            }
            key.reset();
          }
        });

      } catch (IOException e) {
        log.error("error occurred", e);
        //ignore or throw
      }
    }

    private static Map<String, Path> getCSVMap() throws RuntimeException {
      final Map CSVMapping = new ConcurrentHashMap<String, Path>();
      val path = Paths.get(rb.getString("directory_loc"));
      try (Stream<Path> files = Files.list(path).map(Path::toAbsolutePath)
          .filter(CSVFileWatcher.CSVMapping::isCSV)) {
        files.parallel()
            .forEach(p -> CSVMapping.put(p.getFileName().toString(), p));
      } catch (IOException e) {
        log.error("Error occurred", e);
        throw new RuntimeException(e);
      }
      registerFileWatcher(path, CSVMapping);
      return CSVMapping;
    }

  }
}

Posted on Friday, December 21, 2018 by Unknown

Dec 15, 2018

RXJava is an extremely useful streaming framework (here is an example application using it for parallel processing of restful calls to both uber and lyft (RT_UBER_NYC_TAXI)). However, In this post, I will cover how you can reactively stream and process a CSV file.
Firstly, you can create a Flowable of CSVRecord (commons-csv) by converting iterator to Flowable using the call Flowable.fromIterable(). Next, we want this to be safe resource usage i.e. we don't want to leave open file handles, so we use the resource safe Flowable.using(Callable resourceSupplier, Function> sourceSupplier, Consumer resourceDisposer) method call, where the last argument is a resource disposer.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public static Flowable<CSVRecord> readRecordsFromFile(Path inputFile,CSVFormat csvFormat) {
   return Flowable.using(() -> Files.newBufferedReader(inputFile),
       bufferedReader -> csvRecordFlowable(bufferedReader, csvFormat.withHeader()),
       BufferedReader::close
   );
 }

 private static Flowable<CSVRecord> csvRecordFlowable(BufferedReader br, CSVFormat csvFormat) {
   try{
     final CSVParser csvParser = new CSVParser(br,csvFormat);
     return Flowable.fromIterable(() -> csvParser.iterator());
   } catch (IOException e){
     throw new RuntimeException(e);
   }
 }
This nicely sets up a Flowable which can then be processed in different ways and you get all the Flowable features like backpressure, etc. Example usage mentioned below.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
readRecordsFromFile(Paths.get("sample.csv")).parallel().
 runOn(Schedulers.io()).filter(/*filter here*/).map(/*map here*/).sequential().subscribe(new Subscriber<CSVRecord>(){
   Subscription sub = null;

   @Override
   public void onSubscribe(Subscription s){
     sub = s;
   }
   @Override
   public void onNext(CSVRecord record){
     //do something here
     //request next item
     sub.request(1)
   }
   @Override
   public void onError(Throwable t){
     //handle error
   }
   @Override
   public void onComplete(){

   }
 })

Posted on Saturday, December 15, 2018 by Unknown