Archive for the ‘Data Integration’ Category

Big Kettle News

Январь 30th, 2012

Dear Kettle fans,

Today I’m really excited to be able to announce a few really important changes to the Pentaho Data Integration landscape. To me, the changes that are being announced today compare favorably to reaching Kettle version 1.0 some 9 years ago, or reaching version 2.0 with plugin support or even open sourcing Kettle itself…

First of all…

Pentaho is again open sourcing an important piece of software.  Today we’re bringing all big data related software to you as open source software.  This includes all currently available capabilities to access HDFS, MongoDB, Cassandra, HBase, the specific VFS drivers we created as well as the ability to execute work inside of Hadoop (MapReduce), Amazon EMR, Pig and so on.

This is important to you because it means that you can now use Kettle to integrate a multitude of technologies, ranging from files over relational databases to big data and NoSQL.  You can do this in other words without writing any code.  Take a look at how easy it is to program for Hadoop MapReduce:

In other words, this part of the big news of today allows you to use the best tool for the job, whatever that tool is. You can now combine the large set of steps and job entries with all the available data sources and use that to integrate everything. Especially for Hadoop the time it takes to implement a MapReduce job is really small taking the sting out of costly and long training and testing cycles.

But that’s not all…

Pentaho Data Integration as well as the new big data plugins are now available under the Apache License 2.0. This means that it’s now very easy to integrate Kettle or the plugins in 3rd party software. In fact, for Hadoop, all major distributions are already supported including: Amazon Elastic MapReduce, Apache Hadoop, Cloudera’s Distribution including Apache Hadoop (CDH), Cloudera Enterprise, EMC Greenplum HD, HortonWorks Data Platform powered by Apache Hadoop, and MapR’s M3 Free and M5 Edition.
The change of Kettle from LGPL to Apache License 2.0 was broadly supported by our community and acts as an open invitation for other projects (and companies) to integrate Kettle. I hope that more NoSQL, Big Data and Big Search communities will reach out to us to work together to even broaden our portfolio. The way I see it, the Kettle community just got a whole lot bigger!

Where are the goodies?

The main landing page for the Big Data community is placed on our wiki to emphasize our intention to closely work with the various communities to make Pentaho Big Data a success. You can find all information over there, including a set of videos, PDI 4.3.0 preview download (including Big Data plugins), Hadoop installation instructions, PRD configuration information and much more.

Thanks for your time reading this and thanks for using Pentaho software!

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN

Big Kettle News

Январь 30th, 2012

Dear Kettle fans,

Today I’m really excited to be able to announce a few really important changes to the Pentaho Data Integration landscape. To me, the changes that are being announced today compare favorably to reaching Kettle version 1.0 some 9 years ago, or reaching version 2.0 with plugin support or even open sourcing Kettle itself…

First of all…

Pentaho is again open sourcing an important piece of software.  Today we’re bringing all big data related software to you as open source software.  This includes all currently available capabilities to access HDFS, MongoDB, Cassandra, HBase, the specific VFS drivers we created as well as the ability to execute work inside of Hadoop (MapReduce), Amazon EMR, Pig and so on.

This is important to you because it means that you can now use Kettle to integrate a multitude of technologies, ranging from files over relational databases to big data and NoSQL.  You can do this in other words without writing any code.  Take a look at how easy it is to program for Hadoop MapReduce:

In other words, this part of the big news of today allows you to use the best tool for the job, whatever that tool is. You can now combine the large set of steps and job entries with all the available data sources and use that to integrate everything. Especially for Hadoop the time it takes to implement a MapReduce job is really small taking the sting out of costly and long training and testing cycles.

But that’s not all…

Pentaho Data Integration as well as the new big data plugins are now available under the Apache License 2.0. This means that it’s now very easy to integrate Kettle or the plugins in 3rd party software. In fact, for Hadoop, all major distributions are already supported including: Amazon Elastic MapReduce, Apache Hadoop, Cloudera’s Distribution including Apache Hadoop (CDH), Cloudera Enterprise, EMC Greenplum HD, HortonWorks Data Platform powered by Apache Hadoop, and MapR’s M3 Free and M5 Edition.
The change of Kettle from LGPL to Apache License 2.0 was broadly supported by our community and acts as an open invitation for other projects (and companies) to integrate Kettle. I hope that more NoSQL, Big Data and Big Search communities will reach out to us to work together to even broaden our portfolio. The way I see it, the Kettle community just got a whole lot bigger!

Where are the goodies?

The main landing page for the Big Data community is placed on our wiki to emphasize our intention to closely work with the various communities to make Pentaho Big Data a success. You can find all information over there, including a set of videos, PDI 4.3.0 preview download (including Big Data plugins), Hadoop installation instructions, PRD configuration information and much more.

Thanks for your time reading this and thanks for using Pentaho software!

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN

Big Kettle News

Январь 30th, 2012

Dear Kettle fans,

Today I’m really excited to be able to announce a few really important changes to the Pentaho Data Integration landscape. To me, the changes that are being announced today compare favorably to reaching Kettle version 1.0 some 9 years ago, or reaching version 2.0 with plugin support or even open sourcing Kettle itself…

First of all…

Pentaho is again open sourcing an important piece of software.  Today we’re bringing all big data related software to you as open source software.  This includes all currently available capabilities to access HDFS, MongoDB, Cassandra, HBase, the specific VFS drivers we created as well as the ability to execute work inside of Hadoop (MapReduce), Amazon EMR, Pig and so on.

This is important to you because it means that you can now use Kettle to integrate a multitude of technologies, ranging from files over relational databases to big data and NoSQL.  You can do this in other words without writing any code.  Take a look at how easy it is to program for Hadoop MapReduce:

In other words, this part of the big news of today allows you to use the best tool for the job, whatever that tool is. You can now combine the large set of steps and job entries with all the available data sources and use that to integrate everything. Especially for Hadoop the time it takes to implement a MapReduce job is really small taking the sting out of costly and long training and testing cycles.

But that’s not all…

Pentaho Data Integration as well as the new big data plugins are now available under the Apache License 2.0. This means that it’s now very easy to integrate Kettle or the plugins in 3rd party software. In fact, for Hadoop, all major distributions are already supported including: Amazon Elastic MapReduce, Apache Hadoop, Cloudera’s Distribution including Apache Hadoop (CDH), Cloudera Enterprise, EMC Greenplum HD, HortonWorks Data Platform powered by Apache Hadoop, and MapR’s M3 Free and M5 Edition.
The change of Kettle from LGPL to Apache License 2.0 was broadly supported by our community and acts as an open invitation for other projects (and companies) to integrate Kettle. I hope that more NoSQL, Big Data and Big Search communities will reach out to us to work together to even broaden our portfolio. The way I see it, the Kettle community just got a whole lot bigger!

Where are the goodies?

The main landing page for the Big Data community is placed on our wiki to emphasize our intention to closely work with the various communities to make Pentaho Big Data a success. You can find all information over there, including a set of videos, PDI 4.3.0 preview download (including Big Data plugins), Hadoop installation instructions, PRD configuration information and much more.

Thanks for your time reading this and thanks for using Pentaho software!

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN

Data Modeling

Ноябрь 3rd, 2011

Dear data integration fans,

I’m a big fan of “appropriate” data modeling prior to doing any data integration work.  For a number of folks out there that means the creation of an Enterprise Data Warehouse model in classical Bill Inmon style.  Others prefer to use modern modeling techniques like Data Vault, created by Dan Linstedt.  However, the largest group data warehouse architects use a technique called dimensional modeling championed by Ralph Kimball.

Using a modeling technique is very important since it brings structure to your data warehouse.  The techniques used, when applied correctly of-course, are helping you in a big way to avoid all sorts of pitfalls in the design of a data warehouse.

From my own experience and from what I see in my own Kettle community, dimensional modeling is by far the most popular technique used to create data warehouses.  For that reason (and the fact that I’m a huge fan of Kimball) I’ve always made sure to properly support the most complex part of technique: the slowly changing dimension.  For the better part that has made Kettle an excellent choice when it comes to easy translation of your dimensional model to ETL.

However, where these days you have open source tools like Quipu and RapidACE for data vault modelling I was sad to see that not too much exists for dimensional modeling in combination with Kettle.

So a few weeks ago I was doing some basic modeling for a new Pentaho logging data mart for PDI 4.3 EE.  This data mart will be responsible for the delivery of easy to digest reports, analyses views and dashboards on the subjects of monitoring and logging of Pentaho servers.  Initially I started doing this in a nice Eclipse plugin called UMLet which resulted in a data model like this:

While this result isn’t the worst diagram you can possibly imagine there are a number of problems with the approach:

  • The information about dimensions, attributes, relationships, … is not captured in a structured way.
  • Export of the metadata is not possible in any usable format except for PDF and images.
  • UMLet, like so many UML and modeling tools is a generic tool that also supports many other features that I’m not interested in when I’m doing dimensional modeling.  As a result, creating a model takes time and real effort.
  • The work needs to be used in your favorite ETL tool so it makes sense to be have it handy there, instead of having to use a third party tool.
So I thought: wouldn’t it be great if I had some sort of perspective in Spoon where I could do a bit dimensional modeling based on a logical Pentaho metadata model?

Wouldn’t it be great if I could create a new metadata domain to hold all the star models for a certain data mart?

Then wouldn’t it be great if you could edit your star models in there?

The graphics don’t have to be anything fancy, I thought.  It just needs to automatically position the fact table in the middle and the dimensions around it…

Obviously, I would like to be able to edit the name, description and type of the dimension …

and depending on the type of dimension I would like to insert a bunch of default attributes…

Using standard Kettle data grid I should be able to copy attributes and other metadata back and forth between dimension dialogs and a spreadsheet as well.

In the fact table definition it would be cool if we could not only specify the facts but also the relationships to the dimension…

Because that way we wouldn’t have to worry about how to draw the star model and we would know everything we would need to know.

If we would have a tool like that we would be able to generate the SQL to generate the physical tables against a certain target database…

Because if we would have all sorts of knowledge in metadata of the dimensions we could really nicely generate all the required data types, indexes and what not.

And then it would be cool to also generate a template transformation to update the dimension and fact tables in the models…

Well, I thought it would be nice to have that sort of functionality.

Perhaps we could also create physical Pentaho metadata domain (XMI) from the star domain as well as Mondrian schemas and a PDF with documentation.

OK, so this is coming to a PDI release near you in the short term.  I’ve only been working on it for a few weeks on and off but you can try an early version here.  Simply unzip it in the plugins folder of a PDI 4.3 build.  The plugin needs 4.3 since that version already includes a lot of libraries like Pentaho metadata and reporting and that way I don’t need to package all those libraries with it.  We can see later how we can deploy on 4.2 as well.

Please provide feedback here or in PDI-6890.

Until next time,

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN

What is the biggest challenge for Big Data?

Сентябрь 9th, 2011

Often I think about challenges that organizations face with “Big Data”.  While Big Data is a generic and over used term, what I am really referring to is an organizations ability to disseminate, understand and ultimately benefit from increasing volumes of data.  It is almost without question that in the future customers will be won/lost, competitive advantage will be gained/forfeited and businesses will succeed/fail based on their ability to leverage their data assets.

It may be surprising what I think are the near term challenges.  Largely I don’t think these are purely technical.  There are enough wheels in motion now to almost guarantee that data accessibility will continue to improve at pace in-line with the increase in data volume.  Sure, there will continue to be lots of interesting innovation with technology, but when organizations like Google are doing 10PB sorts on 8000 machines in just over 6 hours – we know the technical scope for Big Data exists and eventually will flow down to the masses, and such scale will likely be achievable by most organizations in the next decade.

Instead I think the core problem that needs to be addressed relates to people and skills.  There are lots of technical engineers who can build distributed systems, orders of magnitude more who can operate them and fill them to the brim with captured data.  But where I think we are lacking skills is with people who know what to do with the data.  People who know how to make it actually useful.  Sure, a BI industry exists today but I think this is currently more focused on the engineering challenges of providing an organization with faster/easier access to their existing knowledge rather than reaching out into the distance and discovering new knowledge.  The people with pure data analysis and knowledge discovery skills are much harder to find, and these are the people who are going to be front and center driving the big data revolution.  People who you can give a few PB of data too and they can provide you back information, discoveries, trends, factoids, patterns, beautiful visualizations and needles you didn’t even know were in the haystack.

These are people who can make a real and significant impact on an organizations bottom line, or help solve some of the world’s problems when applied to R&D.  Data Geeks are the people to be revered in the future and hopefully we see a steady increase in people wanting to grow up to be Data Scientists. 


PlanetMySQL Voting: Vote UP / Vote DOWN

NSA, Accumulo & Hadoop

Сентябрь 8th, 2011

Reading yesterday that the NSA has submitted a proposal to Apache to incubate their Accumulo platform.  This, according to the description, is a key/value store built over Hadoop which appears to provide similar function to HBase except it provides “cell level access labels” to allow fine grained access control.  This is something you would expect as a requirement for many applications built at government agencies like the NSA.  But this also is very important for organizations in health care and law enforcement etc where strict control is required to large volumes of privacy sensitive data.

An interesting part of this is how it highlights the acceptance of Hadoop.  Hadoop is no longer just a new technology scratching at the edges of the traditional database market.  Hadoop is no longer just used by startups and web companies.  This is highlighted by outputs like this from organizations such as the NSA.  This is also further highlighted by the amount of research and focus on Hadoop by the data community at large (such as last week at VLDB).  No, Hadoop has become a proven and trusted platform and is now being used by traditional and conservative segments of the market.  

 


PlanetMySQL Voting: Vote UP / Vote DOWN

IA Ventures — Jobs shout out

Август 4th, 2011

My friends over at IA Ventures are looking both for an Analyst and for an Associate to their team.  If Big Data, New York and start-ups is in your blood then I can’t think of a better VC to be involved in. 

From the IA blog:

"IA Ventures funds early-stage Big Data companies creating competitive advantage through data and we’re looking for two start-up junkies to join our team – one full-time associate / community manager and one full time analyst. Because there are only four of us (we’re a start-up ourselves, in fact), we’ll need you to help us investigate companies, learn about industries, develop investment theses, perform internal operations, organize community events, and work with portfolio companies—basically, you can take on as much responsibility as you can handle."

Roger, Brad and the team continue to impress with their focus on Big Data, their strategic investments in monetizing data and knowledge of the industry in general.


PlanetMySQL Voting: Vote UP / Vote DOWN

Realtime Data Pipelines

Август 1st, 2011

In life there are really two major types of data analytics.  Firstly, we don’t know what we want to know – so we need analytics to tell us what is interesting.  This is broadly called discovery.  Secondly, we already know what we want to know – we just need analytics to tell us this information, often repeatedly and as quickly as possible.  This is called anything from reporting or dashboarding through more general data transformation and so on.

Typically we are using the same techniques to achieve this.  We shove lots of data into a repository of some from (SQL, MPP SQL, NoSQL, HDFS etc) then run queries/ jobs/ processes across that data to retrieve the information we care about.  

Now this makes sense for data discovery.  If we don’t know what we want to know, having lots of data in a big pile that we can slice and dice in interesting ways is good.   But when we already know what we want to know, continued batch based processing across mounds of data to produce “updated” results of data, that is often changing in constantly, can be highly inefficient.

Enter Realtime Data Pipelines.  Data is fed in one end, results are computed in real time as data flows down the pipeline and come out the other end whenever relevant changes we care about occur.  Data Pipelines / workflow / streams are becoming much more relevant for processing massive amounts of data with real time results.  Moving relevant forms of analytics out of large repositories into the actual data flow from producer to consumer, I believe, will be a fundamental step forward in big data management.

There are some emerging technologies looking to address this, more details to follow.

 


PlanetMySQL Voting: Vote UP / Vote DOWN

Real-time streaming data aggregation

Июль 28th, 2011

Dear Kettle users,

Most of you usually use a data integration engine to process data in a batch-oriented way.  Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads.  Sometimes folks run micro-batches of work every minute or so.  However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target.  This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on.  Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter!  Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.

Here’s what we want to do:

  1. Continuously read all the tweets that are being sent on Twitter.
  2. Extract all the hash-tags used
  3. Count the number of hash-tags used in a one-minute time-window
  4. Report on all the tags that are being used more than once
  5. Put the output in a browser window, continuously update every minute.

This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on.  It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.

Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things.  We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle.  In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.

OK, let’s see how we can solve our little problem with Kettle instead…

1. Continuously read all the tweets that are being sent on Twitter.

For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:

http://stream.twitter.com/1/statuses/sample.json?delimited=length

Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
HttpClient client = SlaveConnectionManager.getInstance().createHttpClient();
client.setTimeout(10000);
client.setConnectionTimeout(10000);

Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));
client.getState().setCredentials(AuthScope.ANY, creds);
client.getParams().setAuthenticationPreemptive(true);

HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");

// Execute request
//
InputStream inputStream=null;
BufferedInputStream bufferedInputStream=null;
try {
int result = client.executeMethod(method);

// the response
//
inputStream = method.getResponseBodyAsStream();
bufferedInputStream = new BufferedInputStream(inputStream, 1000);

StringBuffer bodyBuffer = new StringBuffer();
int opened=0;
int c;
while ( (c=bufferedInputStream.read())!=-1  && !isStopped()) {
char ch = (char)c;
bodyBuffer.append(ch);
if (ch=='{') opened++; else if (ch=='}') opened--;
if (ch=='}' && opened==0) {
// one JSON block, pass it on!
//
Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
String jsonString = bodyBuffer.toString();

int startIndex = jsonString.indexOf("{");
if (startIndex<0) startIndex=0;

// System.out.print("index="+startIndex+" json="+jsonString.substring(startIndex));

r[0] = jsonString.substring(startIndex);
putRow(data.outputRowMeta, r);

bodyBuffer.setLength(0);
}
}
} catch(Exception e) {
throw new KettleException("Unable to get tweets", e);
} finally {
bufferedInputStream.reset();
bufferedInputStream.close();
}

setOutputDone();
return false;
}

As the experienced UDJC writers among you will notice: this step never ends as long as the twitter service keeps on sending more data.  Depending on the stability and popularity of twitter that can be “a very long time“.

You could improve the code even further to re-connect to the service every time it drops away.  Personally I would not do this.  I would rather have the transformation terminate with an error (as it is implemented now), send an alert (e-mail, database, SNMP) and re-start the transformation in a loop in a job.  That way you have a trace in case twitter dies for a few hours.

2. Extract all the hash-tags used

First we’ll parse the JSON returned by the twitter service, extract the first 5 hash-tags from the message, split this up into 5 rows and count the tags…

3. Count the number of hash-tags used in a one-minute time-window

The counting is easy as you can simply use a “Group by”  step.  However, how can we aggregate in a time-based fashion without too much tinkering?   Well, we now have the “Single Threader” step which has the option to aggregate in a time-based manner so we might as well use this option:

This step simply accumulates all records in memory until 60 seconds have passed and then performs one iteration of the single threaded execution of the specified transformation.  This is a special execution method that doesn’t use the typical parallel engine.  Another cool thing about this engine is that the records that go into the engine in the time-window can be grouped and sorted without the transformation being restarted every minute.

4. Report on all the tags that are being used more than once

The filtering is done with a simple “Filter Rows” step.  However, thanks to the magic of the “Single Threader” step we can sort the rows descending by the tag occurrence count in that one-minute time-window.  It’s also interesting to note that if you have huge amounts of data, that you can easily parallelize your work by starting multiple copies of the single threader step and/or with some clever data partitioning.  In our case we could partition by hash-tag or re-aggregate the aggregated data.

5. Put the output in a browser window, continuously update every minute.

As shown in an earlier blog post, we can do this quite easily with a “Text File Output” step.  However, we also want to put a small header and a separator between the data from every minute so we end up with a transformation that looks like this:

The script to print the header looks like this:

var out;
if (out==null) {
out = _step_.getTrans().getServletPrintWriter();
out.println("'Real-time' twitter hashtag report, minute based");
out.flush();
}

The separator between each minute is simple too:

if (nr==1) {
var out = _step_.getTrans().getServletPrintWriter();
  out.println("============================================");
out.println();
  out.flush();
}

You can execute this transformation on a Carte instance (4.2.0) and see the following output:

'Real-time' twitter hashtag report, minute based
=================================================

nr;hashtag;count;from;to
1;tatilmayonezi;5;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
2;AUGUST6THBUZZNIGHTCLUB;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
3;teamfollowback;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
4;ayamzaman;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
5;dnd;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
6;follow;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
7;malhação;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
8;rappernames;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
9;thingswelearnedontwitter;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
=================================================

1;ska;5;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
2;followplanetjedward;4;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
3;chistede3pesos;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
4;NP;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
5;rappernames;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
6;tatilmayonezi;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
7;teamfollowback;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
8;AvrilBeatsVolcano;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
9;CM6;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
10;followme;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
11;Leão;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
12;NewArtists;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
13;OOMF;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
14;RETWEET;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
15;sougofollow;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
16;swag;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
17;thingswelearnedontwitter;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000

...

For reference, I used the following URL to start the streaming report:

http://cluster:cluster@127.0.0.1:8282/kettle/executeTrans/?trans=%2Fhome%2Fmatt%2Ftest-stuff%2FTwitter Stream%2FRead a twitter stream.ktr&USERNAME=MyTwitterAccount&PASSWORD=MyPassword

I placed the complete example over here in case you want to try this yourself on PDI/Kettle version 4.2.0-RC1 or later. Things you can add to make it even cooler is to have this transformation send an e-mail every time a certain hash-tag gets used more than 15 times in a given minute.  That sort of alerting support for example gives you easy access to emerging new trends, events and memes.

For reference, take a look at this earlier blog post of mine where I describe the internal cleanup mechanisms inside of Kettle that prevent our transformation from ever running out of memory or resources.

Until next time,

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN

Real-time streaming data aggregation

Июль 28th, 2011

Dear Kettle users,

Most of you usually use a data integration engine to process data in a batch-oriented way.  Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads.  Sometimes folks run micro-batches of work every minute or so.  However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target.  This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on.  Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter!  Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.

Here’s what we want to do:

  1. Continuously read all the tweets that are being sent on Twitter.
  2. Extract all the hash-tags used
  3. Count the number of hash-tags used in a one-minute time-window
  4. Report on all the tags that are being used more than once
  5. Put the output in a browser window, continuously update every minute.

This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on.  It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.

Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things.  We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle.  In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.

OK, let’s see how we can solve our little problem with Kettle instead…

1. Continuously read all the tweets that are being sent on Twitter.

For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:

http://stream.twitter.com/1/statuses/sample.json?delimited=length

Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
HttpClient client = SlaveConnectionManager.getInstance().createHttpClient();
client.setTimeout(10000);
client.setConnectionTimeout(10000);

Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));
client.getState().setCredentials(AuthScope.ANY, creds);
client.getParams().setAuthenticationPreemptive(true);

HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");

// Execute request
//
InputStream inputStream=null;
BufferedInputStream bufferedInputStream=null;
try {
int result = client.executeMethod(method);

// the response
//
inputStream = method.getResponseBodyAsStream();
bufferedInputStream = new BufferedInputStream(inputStream, 1000);

StringBuffer bodyBuffer = new StringBuffer();
int opened=0;
int c;
while ( (c=bufferedInputStream.read())!=-1  && !isStopped()) {
char ch = (char)c;
bodyBuffer.append(ch);
if (ch=='{') opened++; else if (ch=='}') opened--;
if (ch=='}' && opened==0) {
// one JSON block, pass it on!
//
Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
String jsonString = bodyBuffer.toString();

int startIndex = jsonString.indexOf("{");
if (startIndex<0) startIndex=0;

// System.out.print("index="+startIndex+" json="+jsonString.substring(startIndex));

r[0] = jsonString.substring(startIndex);
putRow(data.outputRowMeta, r);

bodyBuffer.setLength(0);
}
}
} catch(Exception e) {
throw new KettleException("Unable to get tweets", e);
} finally {
bufferedInputStream.reset();
bufferedInputStream.close();
}

setOutputDone();
return false;
}

As the experienced UDJC writers among you will notice: this step never ends as long as the twitter service keeps on sending more data.  Depending on the stability and popularity of twitter that can be “a very long time“.

You could improve the code even further to re-connect to the service every time it drops away.  Personally I would not do this.  I would rather have the transformation terminate with an error (as it is implemented now), send an alert (e-mail, database, SNMP) and re-start the transformation in a loop in a job.  That way you have a trace in case twitter dies for a few hours.

2. Extract all the hash-tags used

First we’ll parse the JSON returned by the twitter service, extract the first 5 hash-tags from the message, split this up into 5 rows and count the tags…

3. Count the number of hash-tags used in a one-minute time-window

The counting is easy as you can simply use a “Group by”  step.  However, how can we aggregate in a time-based fashion without too much tinkering?   Well, we now have the “Single Threader” step which has the option to aggregate in a time-based manner so we might as well use this option:

This step simply accumulates all records in memory until 60 seconds have passed and then performs one iteration of the single threaded execution of the specified transformation.  This is a special execution method that doesn’t use the typical parallel engine.  Another cool thing about this engine is that the records that go into the engine in the time-window can be grouped and sorted without the transformation being restarted every minute.

4. Report on all the tags that are being used more than once

The filtering is done with a simple “Filter Rows” step.  However, thanks to the magic of the “Single Threader” step we can sort the rows descending by the tag occurrence count in that one-minute time-window.  It’s also interesting to note that if you have huge amounts of data, that you can easily parallelize your work by starting multiple copies of the single threader step and/or with some clever data partitioning.  In our case we could partition by hash-tag or re-aggregate the aggregated data.

5. Put the output in a browser window, continuously update every minute.

As shown in an earlier blog post, we can do this quite easily with a “Text File Output” step.  However, we also want to put a small header and a separator between the data from every minute so we end up with a transformation that looks like this:

The script to print the header looks like this:

var out;
if (out==null) {
out = _step_.getTrans().getServletPrintWriter();
out.println("'Real-time' twitter hashtag report, minute based");
out.flush();
}

The separator between each minute is simple too:

if (nr==1) {
var out = _step_.getTrans().getServletPrintWriter();
  out.println("============================================");
out.println();
  out.flush();
}

You can execute this transformation on a Carte instance (4.2.0) and see the following output:

'Real-time' twitter hashtag report, minute based
=================================================

nr;hashtag;count;from;to
1;tatilmayonezi;5;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
2;AUGUST6THBUZZNIGHTCLUB;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
3;teamfollowback;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
4;ayamzaman;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
5;dnd;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
6;follow;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
7;malhação;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
8;rappernames;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
9;thingswelearnedontwitter;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
=================================================

1;ska;5;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
2;followplanetjedward;4;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
3;chistede3pesos;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
4;NP;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
5;rappernames;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
6;tatilmayonezi;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
7;teamfollowback;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
8;AvrilBeatsVolcano;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
9;CM6;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
10;followme;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
11;Leão;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
12;NewArtists;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
13;OOMF;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
14;RETWEET;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
15;sougofollow;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
16;swag;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
17;thingswelearnedontwitter;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000

...

For reference, I used the following URL to start the streaming report:

http://cluster:cluster@127.0.0.1:8282/kettle/executeTrans/?trans=%2Fhome%2Fmatt%2Ftest-stuff%2FTwitter Stream%2FRead a twitter stream.ktr&USERNAME=MyTwitterAccount&PASSWORD=MyPassword

I placed the complete example over here in case you want to try this yourself on PDI/Kettle version 4.2.0-RC1 or later. Things you can add to make it even cooler is to have this transformation send an e-mail every time a certain hash-tag gets used more than 15 times in a given minute.  That sort of alerting support for example gives you easy access to emerging new trends, events and memes.

For reference, take a look at this earlier blog post of mine where I describe the internal cleanup mechanisms inside of Kettle that prevent our transformation from ever running out of memory or resources.

Until next time,

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN