I started using Cassandra 1.2.5. I created a keyspace and a table with a compound key using
CQL3.
create keyspace test_keyspace with replication = {'class': 'SimpleStrategy', 'replication_factor':1};
create table test_table ( k1 bigint, k2 bigint, created timestamp, PRIMARY KEY (k1, k2) ) with compaction = { 'class' : 'LeveledCompactionStrategy' };
My next task was to popuulate the table with a lot of data.
I used
sstableloader for the task, which uses input created via SSTableSimpleUnsortedWriter.
The code sample uses a simple key, not a compound key. I looked at the classes in the org.apache.cassandra.db.marshall package and found CompositeType, which looks like what I should be using.
Intuitively I thought that since my key is a compund key then the row key is a CompositeType and that the rest works as in the simple example, so I tried using the following code:
List<AbstractType<?>> compositeList = new ArrayList<AbstractType<?>>();
compositeList.add( LongType.instance );
compositeList.add( LongType.instance );
CompositeType compositeType = CompositeType.getInstance( compositeList );
SSTableSimpleUnsortedWriter sstableWriter = new SSTableSimpleUnsortedWriter(
new File( System.getProperty( "output" ) ),
new Murmur3Partitioner(),
"test_keyspace",
"test_table",
compositeType,
null,
64 );
long timestamp = System.currentTimeMillis();
long nanotimestamp = timestamp * 1000;
long k1 = 5L;
long k2 = 10L;
sstableWriter.newRow( compositeType.builder().add( bytes( k1 ) ).add( bytes( k2 ) ).build() );
sstableWriter.addColumn( bytes( "created" ), bytes( timestamp ), nanotimestamp );
sstableWriter.close();
I then loaded the sstable files to Cassandra using the command "sstableloader -v -debug test_keyspace/test_table/"
The command ends without any indication of a problem, but the table remains empty.
I went over the node log file and saw this cryptic exception:
java.lang.RuntimeException: java.lang.IllegalArgumentException
at org.apache.cassandra.service.RangeSliceVerbHandler.doVerb(RangeSliceVerbHandler.java:64)
at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:56)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:247)
at org.apache.cassandra.db.marshal.AbstractCompositeType.getBytes(AbstractCompositeType.java:51)
at org.apache.cassandra.db.marshal.AbstractCompositeType.getWithShortLength(AbstractCompositeType.java:60)
at org.apache.cassandra.db.marshal.AbstractCompositeType.split(AbstractCompositeType.java:126)
at org.apache.cassandra.db.filter.ColumnCounter$GroupByPrefix.count(ColumnCounter.java:96)
at org.apache.cassandra.db.filter.SliceQueryFilter.collectReducedColumns(SliceQueryFilter.java:164)
at org.apache.cassandra.db.filter.QueryFilter.collateColumns(QueryFilter.java:136)
at org.apache.cassandra.db.filter.QueryFilter.collateOnDiskAtom(QueryFilter.java:84)
at org.apache.cassandra.db.RowIteratorFactory$2.getReduced(RowIteratorFactory.java:106)
at org.apache.cassandra.db.RowIteratorFactory$2.getReduced(RowIteratorFactory.java:79)
at org.apache.cassandra.utils.MergeIterator$ManyToOne.consume(MergeIterator.java:114)
at org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:97)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at org.apache.cassandra.db.ColumnFamilyStore$3.computeNext(ColumnFamilyStore.java:1399)
at org.apache.cassandra.db.ColumnFamilyStore$3.computeNext(ColumnFamilyStore.java:1395)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at org.apache.cassandra.db.ColumnFamilyStore.filter(ColumnFamilyStore.java:1466)
at org.apache.cassandra.db.ColumnFamilyStore.getRangeSlice(ColumnFamilyStore.java:1443)
at org.apache.cassandra.service.RangeSliceVerbHandler.executeLocally(RangeSliceVerbHandler.java:46)
at org.apache.cassandra.service.RangeSliceVerbHandler.doVerb(RangeSliceVerbHandler.java:58)
... 4 more
I sent a question to the Cassandra user mailing list and got a reply from Aaron Morton which pointed me in the right direction (
http://thelastpickle.com/2013/01/11/primary-keys-in-cql)
I inserted a row manually and I used cassandra-cli to see what the data looks like:
RowKey: 5
=> (column=10:created, value=0000013f84be6288, timestamp=1372321637000000)
From this example see that the row key is a single Long value "5", and it has one composite column "10:created" with a timestamp value.
Thus the code should look like this:
List<AbstractType<?>> compositeList = new ArrayList<AbstractType<?>>();
compositeList.add( LongType.instance );
compositeList.add( LongType.instance );
CompositeType compositeType = CompositeType.getInstance( compositeList );
SSTableSimpleUnsortedWriter sstableWriter = new SSTableSimpleUnsortedWriter(
new File( System.getProperty( "output" ) ),
new Murmur3Partitioner(),
"test_keyspace",
"test_table",
compositeType,
null,
64 );
long timestamp = System.currentTimeMillis();
long nanotimestamp = timestamp * 1000;
long k1 = 5L;
long k2 = 10L;
sstableWriter.newRow( bytes( k1 ) );
sstableWriter.addColumn( compositeType.builder().add( bytes( k2 ) ).add( bytes( "created" ) ).build(), bytes( timestamp ), nanotimestamp );
sstableWriter.close();
Cassandra 2.0.3 added the CQLSSTableWriter implementation that allows inserting rows without needing to understand the details of how those map to the underlying storage engine (http://www.datastax.com/dev/blog/cassandra-2-0-1-2-0-2-and-a-quick-peek-at-2-0-3).
ReplyDelete