Interfacing with Apache Cassandra 0.8 in Java.


Apache Cassandra is a NoSQL solution that was developed by Facebook to power their Inbox Search System. It is used by prominent companies such as Reddit, Twitter, Rackspace, Digg, to name but a few.

Cassandra's data model is based on column families (which is indexed), with its basic unit of storage, called a column. A column is a name-value store, which are grouped to a column family. You can have indefinite amounts of columns in a column family. Every column family's key must be unique. I don't want to go into details explaining column families so I'll rather post links that you will need to read before venturing further into this blog. There's an interesting article describing the Cassandra data model. Check out WTF is a SuperColumn? An Intro into the Cassandra Data Model.

This is a simple introduction to Cassandra 0.8 as there has major changes from version 0.6 to the current version.

 

Prerequisites.

Cassandra is written in Java, so you will need to download the latest Java in order to run Cassandra. I'm running JDK 1.6.0_25, but any JDK 5 and higher can do.

Downloading and Unzipping Apache Cassandra.

To begin, we will need to download Apache Cassandra from the Apache Cassandra Download Page (I've downloaded the latest Cassandra, version 0.8.2, apache-cassandra-0.8.2-bin.tar.gz). Unzip the archive to the root directory (preferable for the Windows OS users) or a directory of your choice (I'm using Windows 7 Ultimate, and I've unzipped the archive to c:\apache-cassandra-0.8.2\ folder).


Before we continue, we need to setup 2 important environment variables: CASSANDRA_HOME and JAVA_HOME. CASSANDRA_HOME must point to your Cassandra directory (sans specifying the bin folder) and JAVA_HOME needs to point to your java directory (You should know this by now, if you're a java developer! .NET, there's Google! :p)


Now that we're set, let's have some fun!

Setting up and running Apache Cassandra.

Prior to Apache Cassandra 0.7, you had a storage configuration file called storage-conf.xml in your CASSANDRA_HOME/conf folder (if memory serves me correctly). This doesn't apply from Apache Cassandra 0.7 and higher. As of 0.7, the storage configuration file is described in CASSANDRA_HOME/conf/cassandra.yaml file. For more information on storage configuration in Cassandra, visit the Apache Cassandra Wiki StorageConfiguration.

To run Cassandra, go to CASSANDRA_HOME/bin folder and type the following command:

cassandra -f

(the -f option is to tell Cassandra to run on foreground as a non-daemon process). 

If you want to record the cassandra process id to a file, simply use the -p variable, e.g. cassandra -p /var/cassandra.pid

To see if you have successfully started Cassandra, a message will be displayed on your bash/shell window/Command prompt like so (forget the date/time stamp):

INFO [Thread-4] 2011-08-03 12:33:48,880 CassandraDaemon.java (line 145) Listening for thrift clients...

If you see the bolded text ("Listening for thrift clients...") then you should start smiling. :-)

Now, we need to see if Cassandra is truly running. Start Cassandra-CLI (Command Line Interpreter) in another shell/command prompt. In CASSANDRA_HOME/bin folder type

cassandra-cli

The following output should be similar to the one I have below:

Starting Cassandra Client
Welcome to the Cassandra CLI.


Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.


[default@unknown]

Once you see the text [default@unknown] on your screen, type the following command (In Cassandra CLI, every Cassandra command is terminated with a semi-colon ;, else it'll display an ellipsis, ... waiting for a semi-colon):

connect localhost/9160; 

A successful response to the above command will be (in this effect)Connected to: "Test Cluster" on localhost/9160

Another way to connect to Cassandra with Cassandra CLI is to pass the connection parameters when calling cassandra-cli, as follows:

cassandra-cli -host localhost -port 9160

(Cassandra default port is 9160). The result is exactly as above.

Now, let's see if we Cassandra is truly listening (for thrift clients). In Cassandra CLI, type 

show keyspaces;

If you see a list of keyspaces (default keyspace is "system") then Cassandra is up and running.

Type help; on the CLI to see a list of available Cassandra CLI commands. :-)

 

Java Examples: Using Apache Thrift.

For this demonstration, I am using Thrift 0.6.1 (latest, at the time of writing) from Apache Thrift.

Cassandra now expect java.nio.ByteBuffer whereas it was (in Cassandra 0.6 and lower), it used byte arrays. I suggest that you always refer to the Cassandra API Wiki when you're interfacing Cassandra with Thrift.

I will demonstrate how to create a new keyspace, "Keyspace1" and create a column family called "Authors" using Apache Thrift in java. Note: Prior to Cassandra 0.7, a keyspace definitions, with all its column families declarations were written in storage-conf.xml. This doesn't apply anymore: You will have to write code to create your keyspace and column families definitions.

My keyspace and Authors column family is pictured as follows:

1
2
3
4
5
6
7
8
9
"Keyspace1": {
    "Authors": {
        "Alan Johnson": {
            "title": "The Elite Gentleman writes",
            "url": "http://theelitegentleman.blogspot.com/",
            "views": 10
        }
    }
}

And here's the full demonstration code on how I achieved this: I've posted comments for easy understanding :-)



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package org.apache.cassandra.example;
 
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Arrays;
 
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.IndexType;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.SchemaDisagreementException;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
 
/**
 * @author The Elite Gentleman
 * @since 03 August 2011
 *
 */
public class CassandraThriftExample {
     
    private static final String CASSANDRA_HOST = "localhost";
    private static final int CASSANDRA_PORT = 9160;
    private static final String CASSANDRA_KEYSPACE = "keyspace1";
    private static final String CHARSET = "UTF-8";
 
    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        TTransport transport = new TFramedTransport(new TSocket(CASSANDRA_HOST, CASSANDRA_PORT));
        TProtocol protocol = new TBinaryProtocol(transport);
        Cassandra.Client client = new Cassandra.Client(protocol);
         
        try {
            transport.open();
             
            //Drop existing keyspace, if already exists (be careful, as it throws an exception.
            client.system_drop_keyspace(CASSANDRA_KEYSPACE);
             
            String authorsColumnFamilyName = "Authors";
            //Since this is Apache Cassandra greater than 0.7, create our own keyspace
            KsDef keyspaceDefinition = new KsDef();
            keyspaceDefinition.name = CASSANDRA_KEYSPACE;
            keyspaceDefinition.strategy_class = SimpleStrategy.class.getName();
            keyspaceDefinition.setReplication_factorIsSet(true);
             
            //Now, create a column family definition
            CfDef authorsCfDefinition = new CfDef(CASSANDRA_KEYSPACE, authorsColumnFamilyName);
            authorsCfDefinition.comparator_type = "UTF8Type";
            ColumnDef titleColumnDefinition = new ColumnDef(ByteBuffer.wrap("title".getBytes(CHARSET)), "UTF8Type");
            ColumnDef urlColumnDefinition = new ColumnDef(ByteBuffer.wrap("url".getBytes(CHARSET)), "UTF8Type");
            ColumnDef viewsColumnDefinition = new ColumnDef(ByteBuffer.wrap("views".getBytes(CHARSET)), "LongType");
            viewsColumnDefinition.index_type = IndexType.KEYS;
             
            //Add ColumnDef to CfDef
            authorsCfDefinition.addToColumn_metadata(titleColumnDefinition);
            authorsCfDefinition.addToColumn_metadata(urlColumnDefinition);
            authorsCfDefinition.addToColumn_metadata(viewsColumnDefinition);
             
            //Set CfDefs to KsDef
            keyspaceDefinition.cf_defs = Arrays.asList(authorsCfDefinition);
             
            //Now, let Cassandra create our new Keyspace with the KsDef we defined.
            client.system_add_keyspace(keyspaceDefinition);
             
            //Now, we need to be in keyspace, just like in Cassandra-CLI "USE <keyspace>" command.
            client.set_keyspace(CASSANDRA_KEYSPACE);
             
            //Now, let's begin
            //We need the now factor
            long now = System.currentTimeMillis();
             
            //Create a Column Family called "Author"
            ColumnParent columnParent = new ColumnParent(authorsColumnFamilyName);
            ByteBuffer keyBuffer = ByteBuffer.wrap("Alan Johnson".getBytes(CHARSET));
             
            Column titleColumn = new Column(ByteBuffer.wrap("title".getBytes(CHARSET)));
            titleColumn.setValue(ByteBuffer.wrap("The Elite Gentleman writes".getBytes(CHARSET)));
            titleColumn.setTimestamp(now);
            client.insert(keyBuffer, columnParent, titleColumn, ConsistencyLevel.ONE);
             
            Column urlColumn = new Column(ByteBuffer.wrap("url".getBytes(CHARSET)));
            urlColumn.setValue(ByteBuffer.wrap("http://theelitegentleman.blogspot.com/".getBytes(CHARSET)));
            urlColumn.setTimestamp(now);
            client.insert(keyBuffer, columnParent, urlColumn, ConsistencyLevel.ONE);
 
        } catch (TTransportException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InvalidRequestException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (TException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (UnavailableException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (TimedOutException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SchemaDisagreementException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            transport.close();
        }
    }
}

As you can see, this code throws an UnavailableException. The Cassandra API simply gives a one liner explanation of what UnavailableException means:


Not all the replicas required could be created and/or read.

This is simple: I am using a SimpleStrategy and SimpleStrategy requires a replication_factor to be set. For NetworkTopologyStrategy, you will have to specify each data centre and the replication number on the strategy_options (More information can be found here). Now, we need to add a replication factor, since Cassandra 0.8 doesn't have the integer field for replication_factor anymore, we need to add this field onto the Map<String, String> strategy_options.

The following code, below, shows how:

1
2
3
4
5
6
7
8
9
10
KsDef keyspaceDefinition = new KsDef();
keyspaceDefinition.name = CASSANDRA_KEYSPACE;
keyspaceDefinition.strategy_class = SimpleStrategy.class.getName();
 
//Set replication factor
if (keyspaceDefinition.strategy_options == null) {
    keyspaceDefinition.strategy_options = new LinkedHashMap<String, String>();
}
 
keyspaceDefinition.strategy_options.put("replication_factor", "1");


And now, remove, this code keyspaceDefinition.setReplication_factorIsSet(false);, and we're good to go.

The final code below throws no exceptions.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedHashMap;
 
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.IndexType;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.SchemaDisagreementException;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
 
/**
 * @author The Elite Gentleman
 * @since 03 August 2011
 *
 */
public class CassandraThriftExample {
     
    private static final String CASSANDRA_HOST = "localhost";
    private static final int CASSANDRA_PORT = 9160;
    private static final String CASSANDRA_KEYSPACE = "keyspace1";
    private static final String CHARSET = "UTF-8";
 
    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        TTransport transport = new TFramedTransport(new TSocket(CASSANDRA_HOST, CASSANDRA_PORT));
        TProtocol protocol = new TBinaryProtocol(transport);
        Cassandra.Client client = new Cassandra.Client(protocol);
         
        try {
            transport.open();
             
//            client.system_drop_keyspace("keyspace");
            client.system_drop_keyspace(CASSANDRA_KEYSPACE);
             
            String authorsColumnFamilyName = "Authors";
            //Since this is Apache Cassandra greater than 0.7, create our own keyspace
            KsDef keyspaceDefinition = new KsDef();
            keyspaceDefinition.name = CASSANDRA_KEYSPACE;
            keyspaceDefinition.strategy_class = SimpleStrategy.class.getName();
             
            //Set replication factor
            if (keyspaceDefinition.strategy_options == null) {
                keyspaceDefinition.strategy_options = new LinkedHashMap<String, String>();
            }
             
            keyspaceDefinition.strategy_options.put("replication_factor", "1");
             
            //Now, create a column family definition
            CfDef authorsCfDefinition = new CfDef(CASSANDRA_KEYSPACE, authorsColumnFamilyName);
            authorsCfDefinition.column_type = "Standard";
            authorsCfDefinition.comparator_type = "UTF8Type";
            ColumnDef titleColumnDefinition = new ColumnDef(ByteBuffer.wrap("title".getBytes(CHARSET)), "UTF8Type");
            ColumnDef urlColumnDefinition = new ColumnDef(ByteBuffer.wrap("url".getBytes(CHARSET)), "UTF8Type");
            ColumnDef viewsColumnDefinition = new ColumnDef(ByteBuffer.wrap("views".getBytes(CHARSET)), "LongType");
            viewsColumnDefinition.index_type = IndexType.KEYS;
             
            //Add ColumnDef to CfDef
            authorsCfDefinition.addToColumn_metadata(titleColumnDefinition);
            authorsCfDefinition.addToColumn_metadata(urlColumnDefinition);
            authorsCfDefinition.addToColumn_metadata(viewsColumnDefinition);
             
            //Set CfDefs to KsDef
            keyspaceDefinition.cf_defs = Arrays.asList(authorsCfDefinition);
             
            //Now, let Cassandra create our new Keyspace with the KsDef we defined.
            client.system_add_keyspace(keyspaceDefinition);
             
            //Now, we need to be in keyspace, just like in Cassandra-CLI "USE <keyspace>" command.
            client.set_keyspace(CASSANDRA_KEYSPACE);
             
            //Now, let's begin
            //We need the now factor
            long now = System.currentTimeMillis();
             
            //Create a Column Family called "Author"
            ColumnParent columnParent = new ColumnParent(authorsColumnFamilyName);
            String key = "Alan Johnson";
             
            Column titleColumn = new Column(ByteBuffer.wrap("title".getBytes(CHARSET)));
            titleColumn.setValue(ByteBuffer.wrap("The Elite Gentleman writes".getBytes(CHARSET)));
            titleColumn.setTimestamp(now);
             
            Column urlColumn = new Column(ByteBuffer.wrap("url".getBytes(CHARSET)));
            urlColumn.setValue(ByteBuffer.wrap("http://theelitegentleman.blogspot.com/".getBytes(CHARSET)));
            urlColumn.setTimestamp(now);
             
            client.insert(ByteBuffer.wrap(key.getBytes(CHARSET)), columnParent, titleColumn, ConsistencyLevel.ONE);
            client.insert(ByteBuffer.wrap(key.getBytes(CHARSET)), columnParent, urlColumn, ConsistencyLevel.ONE);
             
            //flush
            transport.flush();
 
        } catch (TTransportException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InvalidRequestException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (TException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (UnavailableException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (TimedOutException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SchemaDisagreementException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            transport.close();
        }
    }
}


Hooray, it works!!! :-)

I will continue with a tutorial on how to connect with Hector (a Java Cassandra Client). Hector comes with wonderful features, such as Connection Pooling for Cassandra, JMX Support, etc. For more information on Hector, head over to the Hector site.


Have fun!

PS: A related StackOverflow question can be useful too, I hope. :-)

Comments

  1. Good Post. In addition to this, there is one more thing to know that if we use NetworkTopology then we have to set data center option like this:

    keyspaceDefinition.strategy_options.put("datacenter1", "1");

    ReplyDelete
  2. The most effective method to Solve Apache Cassandra 0.8.2 Issue through Cassandra Technical Support
    In the event that you download 0.8.2 rendition of Cassandra yet it appears there is having various of changes made in this form and some specialized hiccups at that point straightforwardly influence an immediate association with Apache Cassandra Support or Cassandra Customer Service to get the snappy arrangement. Rapidly get to all the important data which you have to accomplish ideal Cassandra Database execution with Cassandra Database Consulting and Support.
    For More Info: https://cognegicsystems.com/
    Contact Number: 1-800-450-8670
    Email Address- info@cognegicsystems.com
    Company’s Address- 507 Copper Square Drive Bethel Connecticut (USA) 06801

    ReplyDelete

Post a Comment