Elasticsearch 6.0: create index, bulk insert and delete data via Java

In this tutorial we set up a local Elasticsearch 6.0 server and create indices, insert, delete and query data via the Java API on Windows. Elasticsearch is a distributed full-text NoSQL (data is stored in JSON format) search engine based on Apache Lucene and written in Java. Apache Solr and Elasticsearch are the most prevalent search servers. The following example code is provided as maven project on Git.

1. Elasticsearch prerequisites

  • Download and install JVM 1.8; Remember to set JAVA_HOME environment variable (for Windows)
  • Download the zipped Elasticsearch server 6.0
  • Maven to run the example Java code (How to setup Maven?)

2. Installation

  1. If not already installed, install the JVM 1.8 from the link above. Set your JAVA_HOME environment variable to the JDK or JRE folder of your Java installation. Elasticsearch will automatically choose the version defined in the JAVA_HOME variable on startup.
  2. Unzip the Elasticsearch zip file. We use the standard development configuration settings (no production) with one exception. We changed the cluster name (cluster.name) to “tutorial-academy-cluster” in the config/elasticsearch.yml file.
  3. Run the bin/elasticsearch.bat to start the server
  4. Go to http://localhost:9200 to check if the server started correctly. You should see an output similar to this:
    {
      "name" : "BfbaDq9",
      "cluster_name" : "tutorial-academy-cluster",
      "cluster_uuid" : "czbJ1zpFS6Knhq0s7Jj1eA",
      "version" : {
        "number" : "6.0.0",
        "build_hash" : "8f0685b",
        "build_date" : "2017-11-10T18:41:22.859Z",
        "build_snapshot" : false,
        "lucene_version" : "7.0.1",
        "minimum_wire_compatibility_version" : "5.6.0",
        "minimum_index_compatibility_version" : "5.0.0"
      },
      "tagline" : "You Know, for Search"
    }
  5. The server is running, lets move on to some code: Clone the Git repository.
  6. If not already done, install Maven and run the clean install command to build the project or work from
  7. You can work or execute the code via an IDE like Eclipse or IntelliJ as well

3. Code step-by-step

We use a helper class called ElasticSearchConnector to abstract some functionality. Let us start with the initialization of a Elasticsearch client to communicate with the server. In our code, that is happening in the Constructor along with changing some settings. You can see that we set the cluster name as an example, but basically ignore the name a line later to be compatible with any cluster configuration. Finally we connect to the localhost on port 9300, which can be seen in the main method later on.

	private TransportClient client = null;
	
	public ElasticSearchConnector( String clusterName, String clusterIp, int clusterPort ) throws UnknownHostException {
		
		Settings settings = Settings.builder()
				  .put( "cluster.name", clusterName )
				  .put( "client.transport.ignore_cluster_name", true )
				  .put( "client.transport.sniff", true )
				  .build();
				
				// create connection
				client = new PreBuiltTransportClient( settings ); 
				client.addTransportAddress( new TransportAddress( InetAddress.getByName( clusterIp ), clusterPort) );
				
		logger.info( "Connection " + clusterName + "@" + clusterIp + ":" + clusterPort + " established!" );		
	}

Now that we are connected to the server, the Java API offers functionality to e.g. check the status of the cluster. We wait for the GREEN status which indicates that the cluster is healthy, meaning synchronized and ready to work with.

	public boolean isClusterHealthy() {

		final ClusterHealthResponse response = client
			    .admin()
			    .cluster()
			    .prepareHealth()
			    .setWaitForGreenStatus()
			    .setTimeout( TimeValue.timeValueSeconds( 2 ) )
			    .execute()
			    .actionGet();

		if ( response.isTimedOut() ) {
			logger.info( "The cluster is unhealthy: " + response.getStatus() );
			return false;
		}

		logger.info( "The cluster is healthy: " + response.getStatus() );
		return true;
	}

The cluster is ready now and we can start with creating an index. Before that, we check that the same index was not created previously.

	public boolean isIndexRegistered( String indexName ) {
		// check if index already exists
		final IndicesExistsResponse ieResponse = client
			    .admin()
			    .indices()
			    .prepareExists( indexName )
			    .get( TimeValue.timeValueSeconds( 1 ) );
			
		// index not there
		if ( !ieResponse.isExists() ) {
			return false;
		}
		
		logger.info( "Index already created!" );
		return true;
	}

If the index does not exist already, we create the index.

	public boolean createIndex( String indexName, String numberOfShards, String numberOfReplicas ) {
		CreateIndexResponse createIndexResponse = 
			client.admin().indices().prepareCreate( indexName )
        	.setSettings( Settings.builder()             
                .put("index.number_of_shards", numberOfShards ) 
                .put("index.number_of_replicas", numberOfReplicas )
        	)
        	.get(); 
				
		if( createIndexResponse.isAcknowledged() ) {
			logger.info("Created Index with " + numberOfShards + " Shard(s) and " + numberOfReplicas + " Replica(s)!");
			return true;
		}
		
		return false;				
	}

You can go to http://localhost:9200/_cat/indices? to check if the index (tutorial-academy) was created and what its status is. The other numbers indicate e.g. how many documents are indexed, how many are deleted etc.

green open tutorial-academy cpEWSiOvRYGlLqKeJpjhKA 1 0 4 1 4.5kb 4.5kb

After successfully creating the index, we start to load some data. The loaded data corresponds to this JSON file.

[
	{ 
		"name" : "Mark Twain",
		"age"  : 75
	},
	{
		"name" : "Tom Saywer",
		"age"  : 12
	},
	{
		"name" : "John Doe",
		"age"  : 20
	},
	{
		"name" : "Peter Pan",
		"age"  : 15
	},
	{
		"name" : "Johnnie Walker",
		"age"  : 37
	}
]

We basically want to index a JSON array consisting of objects with the properties name and age. We use a bulk insert to insert all the data at once. In our tests it happened that the cluster health status was not ready when we tried to run a search/delete query directly after the insert. Consequently, we added the setRefreshPolicy( RefreshPolicy.IMMEDIATE ) method to signalize the server to refresh the index after the specified request. The data can now be queried directly after.

	public boolean bulkInsert( String indexName, String indexType ) throws IOException { 
		BulkRequestBuilder bulkRequest = client.prepareBulk();
		
		// either use client#prepare, or use Requests# to directly build index/delete requests
		bulkRequest.setRefreshPolicy( RefreshPolicy.IMMEDIATE ).add( 
			client.prepareIndex( indexName, indexType, null )
		        .setSource( XContentFactory.jsonBuilder()
	                .startObject()
	                    .field( "name", "Mark Twain" )
	                    .field( "age", 75 )
	                .endObject()
	    ));

		bulkRequest.setRefreshPolicy( RefreshPolicy.IMMEDIATE ).add( 
			client.prepareIndex( indexName, indexType, null )
		        .setSource( XContentFactory.jsonBuilder()
	                .startObject()
	                    .field( "name", "Tom Saywer" )
	                    .field( "age", 12 )
	                .endObject()
		));
		
		bulkRequest.setRefreshPolicy( RefreshPolicy.IMMEDIATE ).add( 
			client.prepareIndex( indexName, indexType, null )
		        .setSource( XContentFactory.jsonBuilder()
	                .startObject()
	                    .field( "name", "John Doe" )
	                    .field( "age", 20 )
	                .endObject()
		));
		
		bulkRequest.setRefreshPolicy( RefreshPolicy.IMMEDIATE ).add( 
			client.prepareIndex( indexName, indexType, null )
		        .setSource( XContentFactory.jsonBuilder()
	                .startObject()
	                    .field( "name", "Peter Pan" )
	                    .field( "age", 15 )
	                .endObject()
		));
		
		bulkRequest.setRefreshPolicy( RefreshPolicy.IMMEDIATE ).add( 
			client.prepareIndex( indexName, indexType, null )
		        .setSource( XContentFactory.jsonBuilder()
	                .startObject()
	                    .field( "name", "Johnnie Walker" )
	                    .field( "age", 37 )
	                .endObject()
		));

		BulkResponse bulkResponse = bulkRequest.get();
		if ( bulkResponse.hasFailures() ) {
		    // process failures by iterating through each bulk response item
			logger.info( "Bulk insert failed!" );
			return false;
		}
		
		return true;
	}

To query the data, we use a SearchResponse in combination with a scroll. A scroll is basically the Elasticsearch counterpart to a cursor in a traditional SQL database. Using that sort of query is quite an overkill for our example and just for demonstration purposes. It is rather used to query large amounts of data (not like five documents in our case) and not intended for real-time user requests.

	public void queryResultsWithAgeFilter( String indexName, int from, int to ) {
		SearchResponse scrollResp = 
			client.prepareSearch( indexName )
			// sort order
	        .addSort( FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC )
	        // keep results for 60 seconds
	        .setScroll( new TimeValue( 60000 ) )
	        // filter for age
	        .setPostFilter( QueryBuilders.rangeQuery( "age" ).from( from ).to( to ) )
	        // maximum of 100 hits will be returned for each scroll
	        .setSize( 100 ).get(); 
		
		// scroll until no hits are returned
		do {
			int count = 1;
		    for ( SearchHit hit : scrollResp.getHits().getHits() ) {
		    	Map<String,Object> res = hit.getSourceAsMap();
		    	
		    	// print results
		    	for( Map.Entry<String,Object> entry : res.entrySet() ) {
		    		logger.info( "[" + count + "] " + entry.getKey() + " --> " + entry.getValue() );
		    	}
		    	count++;
		    }

		    scrollResp = client.prepareSearchScroll( scrollResp.getScrollId() ).setScroll( new TimeValue(60000) ).execute().actionGet();
		// zero hits mark the end of the scroll and the while loop.
		} while( scrollResp.getHits().getHits().length != 0 ); 
	}

After successfully querying data, we try to delete documents using a key-value pair to get deeper into the Elasticsearch behavior.

	public long delete( String indexName, String key, String value ) {
		BulkByScrollResponse response =
			    DeleteByQueryAction.INSTANCE.newRequestBuilder( client )
			        .filter( QueryBuilders.matchQuery( key, value ) ) 
			        .source( indexName )
			        .refresh( true )
			        .get();                                             

		logger.info( "Deleted " + response.getDeleted() + " element(s)!" );
		
		return response.getDeleted();
	}

Now let us put it all together in a small main function. Here we read some properties like the number of shards, index name etc. from a properties file and start to call the methods described above.

	public static void main( String[] args ) 
    {
		logger.info( "Starting..." );
		// read properties
		PropertyReader properties = null;
		ElasticSearchConnector es = null;
		
		try {
			properties = new PropertyReader( getRelativeResourcePath( "config.properties" ) );
			
			String numberOfShards  	= properties.read( NUMBER_OF_SHARDS );
			String numberOfReplicas	= properties.read( NUMBER_OF_REPLICAS );
			
			String clusterName 		= properties.read( CLUSTER_NAME );
			
			String indexName 		= properties.read( INDEX_NAME );
			String indexType 		= properties.read( INDEX_TYPE );
			
			String ip 				= properties.read( IP );
			int    port				= Integer.parseInt( properties.read( PORT ) );
		
			es = new ElasticSearchConnector( clusterName, ip, port );
			
			// check if elastic search cluster is healthy
			es.isClusterHealthy();
			
			// check if index already existing
			if( !es.isIndexRegistered( indexName ) ) {
				// create index if not already existing
				es.createIndex( indexName, numberOfShards, numberOfReplicas );
				// manually insert some test data			
				es.bulkInsert( indexName, indexType );
				// insert some test data (from JSON file)
//				es.bulkInsert( indexName, indexType, getRelativeResourcePath( "data.json" ) );
			}
			
			// retrieve elements from the user data where age is in between 15 ad 50
			es.queryResultsWithAgeFilter( indexName, 15, 50 );
			
			es.delete( indexName, "name", "Peter Pan" );

			// retrieve elements from the user data where age is in between 15 ad 50
			es.queryResultsWithAgeFilter( indexName, 15, 50 );
		}
		catch ( FileNotFoundException e ) {
			e.printStackTrace();
		}
		catch ( UnknownHostException e ) {
			e.printStackTrace();
		}
		catch ( IOException e ) {
			e.printStackTrace();
		} 
		// required when parsing JSON
//		catch ( ParseException e ) {
//			e.printStackTrace();
//		}
		finally {
			es.close();
		}
    }

Imports and missing methods can be found in the Git repository. The main method performs the following steps:

  1. Read configuration parameters from the properties file
  2. Connect to the Elasticsearch server
  3. Check the status of the server
  4. Check if our defined index is already created
  5. If not, create and insert data
  6. Query data using an age filter
  7. Remove one document
  8. Query data again

4. Conclusion

That is it. You successfully deployed your Elasticsearch application. We demonstrated only some basic concepts and interaction with the server. The Java API and especially the REST API offer far more functionality. If you are interested, we recommend to check it out and dive deeper into it. Fast search access to data becomes more and more relevant and is a bonus for every developer out there.

Get familiar with the way Elasticsearch works, how to work with the requests, how to deploy in production and especially determine if you require synchronous (wait for the operation to finish) or asynchronous (call back function when done – no blocking) calls. The code above is executed directly after the server call.

If you have problems or errors, feel free to comment and ask.

Facebooktwitterredditpinterestlinkedinmail

Related posts

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.