Really interesting!
Just to confirm I understood the basics correctly:
Write means append, to a timeseries which is keyed by a timestamp, and is identified by some name?
If a write succeeds only partially, different servers have different data. And a read might return any of these versions.
After some time the anti-entropy repair will kick in, and merge the diverging timeseries. Merging means taking the union of all data points.
Where do the timestamps come from, the client? So if a client retries a partially successful write, it'll have the same timestamp and will be merged during repair.
Are timestamps within a timeseries monotonically increasing?
The hinted handoff sounds like it is motivated by a similar problem that the Kafka in sync replica set tackles. Do you have any views on the pros/cons of your approach via ISR sets? I think Kafka uses ZK for the ISR management which means it wouldn't work with the availability requirements of InfluxDB, but could a modified version work?
So overall InfluxDB is sacrificing lots of consistency for availability.
Since the CP part of the system is actually cached, the entire system is really AP?
If not, what parts are not AP? Modification of the CP part, like creation of new timeseries?
From a users perspective I could see it being useful to have a historical part of the timeseries that's guaranteed to be stable, and an in-flux part where the system hasn't settled yet. Then one could run run expensive analytics on the historical part, without having to recalculate everything on the next read since the data could have changed since then. You're already hashing your data and building a Merkle Tree, maybe that would make it possible to implement something like that.
Timestamps should mostly be supplied by the client. They can be present or in the past, it doesn't matter.
If a write succeeds only partially, it will most likely be replicated up to the other servers (and thus be consistent) by the hinted handoff queue. This should be a fast recovery. Anti-entropy is for some much longer term failure that needs to be resolved.
Our use of hinted handoff and our goals for that are just borrowed ideas from Dynamo (the paper not the AWS DB), Cassandra, and Riak.
The issues around consistency are only for the failure cases. During normal operation you'd see a consistent view (within a second or some small delta).
Mostly the system is AP, with some parts that are CP. But if you really examine it, it's neither pure CP nor pure AP. It's some other thing.
That's fine too. However, queries by default set an end time of "now" on the server. So depending on how far in the future the point is, it may not show up in a query that doesn't have the end time explicitly specified to past the future time of that point.
What is the difference between this and Cassandra? A more powerful querying language?
Cassandra already has Consistency Levels with Replication Strategies. I feel like the only way to get powerful querying out of a system like this would be to have a map reduce layer on top of your db which is what many do to get powerful querying from cassandra.
This is purpose built for this use case. The MapReduce system you talk about is part of what's built in. Each aggregate function in the query language is represented as a MapReduce job that gets run on the fly.
The other part of it is that we're optimized for this use case. I've built "time series databases" on top of Cassandra before. It requires a great deal of application level code and hacking to get things like retention policies and continuous queries, which are built into InfluxDB.
We're using Influx for analytics, basically storing events with lots of detailed metadata and data, allowing us to ad-hoc aggregations like "select count(*) from viewed_product where product.category = ? and site = ? and user_agent = ? group by time(1d)". We're storing a lot of nested data; we are in fact merging in original data models so that we have historical event data even if the original data is deleted.
But reading about the changes in 0.9, I've noticed that you're moving towards a more fine-grained data model tuned for collecting measurements, with typically a single value per item + metadata in the form of tags, a lot like Prometheus' data model. This would seemingly make it even less appropriate for our use case, despite the fact that InfluxDB is apparently still intended for analytics. Influx also isn't performing very well at querying our data (compared to ElasticSearch and PostgreSQL), though I have been hoping this is because Influx being young and unoptimized and (as far as I understand) doesn't index any part of the value, only the tags.
Can you shed some light on this? Should we move off InfluxDB? Or will the new tag-based data model improve our use case?
The new model combines tags, which are indexed, with fields, which are not indexed. A measurement can have up to 255 different fields, all of which can be written for a single data point.
As we push things forward we'll be adding more analytics queries in. But for the time being it's more aptly suited for metrics and sensor data.
With 0.9.0 you should be able to use a combination of fields and tags to get some fairly sophisticated queries. Where clauses also work on both tags and field values.
It might be easier to discuss on the InfluxDB Google Group. I'd like to hear more about your specific use case, the data you're writing in, and the kinds of questions you're asking of that data.
Interesting. So we'd see a lot of benefit from the new model.
Except our documents may easily have more than 255 fields. We would never to query that much, mind you; but we don't know ahead of time what we'd need to query.
A nice thing about InfluxDB compared to other solutions is that it's schemaless and can still aggregate data pretty fast. ElasticSearch is much faster (than 0.8), but it has a big problem with indexes needing to be predefined (auto-creating mappings is highly flawed).
I might drop by the Google group with more questions.
Because ElasticSearch just assigns index settings based on the first value it gets.
For trivial values such as numbers, that's usually okay, unless it happens that the field is polymorphic (not a good schema design, of course).
But it doesn't know how to set up any of the mappings; it doesn't know whether something is a full-text field (which often requires analyzers) or an atomic, enum-type string.
It also doesn't know about dates. If you index pure JSON documents, it will simply store them as strings.
This would all have been a non-problem if updating mappings in ES were simple, but it's not. A mapping is generally append-only; if you want change the type of a mapping, or its analyzer, or most other settings, you have to create a new index and repopulate it. Schema migrations in ES are a lot more painful than, say, PostgreSQL.
Analytics are usually based on aggregating discreet events. It's based on irregular time series. Metrics and sensor data are usually regular time series. That is, series where you have samples taken at regular intervals of time, like once ever 10 seconds.
When it comes to querying regular time series, don't have a huge number of points to aggregate across a single series, where analytics can have millions in a single series that you're looking at.
Then there are other types of queries that you need in analytics that don't make sense in metrics like getting sessions and calculating funnels.
InfluxDB is still useful for analytics today, it's just that in some instances it's more basic and crude compared to what you can do with things like MixPanel.
Is it possible that a regular time series could have better read performance (particularly in aggregations) vs an irregular one due to determinism/randomness - or is that irrelevant to the underlying implementation?
Looks interesting. This answers a lot of questions about primary data, but I'm interested to know how continuous queries [1] are handled.
If they're stored like any other data, the "inserts are far more likely than updates" assumption doesn't really make sense. For example, what if you have a continuous query grouped by year?
PS. "challenges" is mis-spelled in the last sentence.
Each of those continuous query data points would be an insert. I guess if you're recalculating it frequently, those new points would be updates, however, under non-failure conditions that won't be a problem.
If failure conditions occur, hinted handoff should take care of it, or the next CQ run that recomputes the result should make it correct after the failure condition is resolved.
Finally, those aren't contentious updates. You're not talking about multiple clients trying to make different updates to the exact same point, which is really what I mean when I talk about not optimizing for updates.
I have a question about the new tagging feature in InfluxDB 0.9 - hopefully you can clear up my confusion.
My understanding is that tags are great, as they are indexed and hence quick to search.
However, they are not suitable for situations where you have very high cardinality (> 100,000). Assuming this isn't the case, where else would you use fields over tags?
For example - apart from the indexing and cardinality issue - what are the pros/cons of
Or say you have a logline, and you're parsing various attributes out of it (meaning all the values are quite tightly associated with each other) - would you split them into separate series, each with their own set of tags, or would you store them in a single series with multiple fields?
For fields, you generally only want to put two pieces of data together in a single point (thus in two fields) if you're always going to be querying them together. Either by pulling the values out, or filtering on a WHERE clause.
Unrelated to this, we've created a line protocol to write data in and it's a much more compact way to show a point:
Aha, fair enough - so if you only sometimes query the values together, you should separate them out into discrete series =).
For a logline, the sort of metrics we'd get would be things like query duration, database, client ID etc. You've often query them together, but you'd also query them separately, so I guess that also makes sense as multiple separate series.
Interesting, that line protocol looks cool and seems like it'd be more efficient on the wire.
Will the drivers (e.g. Python, Go) be updated to take advantage of this new endpoint?
Finally, I see there's also a ticket open for binary protocols:
The Go client has already been updated. We'll be updating the Ruby and front end JS client. Hopefully the community will jump on updating the other clients once we release 0.9.0. It's a super simple protocol so I don't imagine it'll take much work.
The binary protocol is probably much less useful now. HTTP + GZip of the line protocol will already saturate what our storage engine can do at this point. In fact, I'm going to close that out right now...
I don't know if you've done this anywhere, but have you created any guidelines on tuning Influx based on workload?
We took a look at it last year, looked fantastic until we actually gave it more than a couple of metrics - and then it crashed and burned in so many ways. I tried fiddling with sharding and replication with no great success.
This was just before you were planning to get rid of the manual configuration for these things - so perhaps it's improved since then.
Having some idea of "If you want to store {X} metrics with {Y} updates per second for {Z} time, then you'll need at least this hardware configuration" would be great.
As it is, we've stuck with graphite, and we're kinda waiting for you guys to go past 1.0.
We don't have those recommendations at the moment. We'll be putting that out over the next few months after we release 0.9.0 and some point releases after that.
Out of curiosity, how many metrics are you tracking in Graphite? What's your sampling interval?
That should be doable if you're batching data together. The next build will actually do this automatically for you if you're writing into the Graphite, CollectD, or UDP inputs. It buffers some updates in memory to batch them in a single write to the underlying storage.
Might be worth another look for you when 0.9.0 comes out :)
> Every server in the cluster keeps an in memory copy of this cluster metadata. Each will periodically refresh the entire metadata set to pick up any changes.
How does this avoid nodes doing stale reads from the inmemory copy resulting in each node having a slightly different out-of-date view of the cluster?
They could have an out of date view of the cluster. Like which databases exist, or which servers exist.
However, those cases are fine. If a database was created and a server doesn't have a copy of it. When a write comes in it'll be a cache miss so it'll hit the Raft system to get the info.
In the case of a new server joining the cluster, it won't get new data assigned until a shard group gets created and a shard gets assigned to it. When a write comes in if a node doesn't have a shard group for that time range, it'll all out to the Raft system to get the information.
So yes, it's possible for some servers to have a stale view of this cluster metadata, but we work around it by having them request the information on demand and periodically refresh the entire thing.
I believe Raft is supposed to handle this; every modification is a log entry, and every recipient has to ack the log entry, a bit like two-phased commit.
This is incorrect, only reads from the current Raft master are guaranteed to not be stale. In the case of InfluxDB, I think caching is safe because the shard metadata is immutable.
Where are you seeing that cluster metadata is immutable? I don't even know how that would work. Surely nodes, databases, shards, users, permissions, etc. all can change?
Did you switch your Raft implementation & would you care to share your insights on the different implementations you guys have tried? (If there was a blog post on that, would appreciate the link).
This is the third Raft implementation for us. The original was goraft, then we attempted to make our own streaming raft implementation for high throughput.
This time around we realized we should only put very low write throughput stuff through Raft. And we wanted to pick on that has been out for a while and running in production in many places.
So we chose the Hashicorp implementation because we know them and know that it's been in production for a while. Combined with the fact that it can be backed by BoltDB, which we like.
My advice to you is to (a) model in TLA+. Writeup seems fine but, as you note, distribution is non-trivial (since intuition does not sufficiently/effectively inform the mental model). (2) do not rely on monotonic nanosecs from Go runtime. Only usec level precision is guaranteed monotonic. (3) if you don't own the client you are open to byzantine failure.
Can it be called a pure AP system if a long running network partition can cause it to become unavailable?
In our case, shard groups are part of the data that is in the CP system. We create these ahead of time, but those are what we use to determine where in the cluster a given write should go and what servers we need to hit to answer a query.
Let's call a "normal partition" one that is less than a few hours. In that case I wouldn't expect it to cause the system to become unavailable. However, there are certainly scenarios in which a longer partition would make it unavailable.
And even for a "normal partition", new databases wouldn't be available to write to.
> Being able to write and query the data is more important than having a strongly consistent view
I can imagine some use cases where this is a very reasonable assumption (statsd-style analysis & monitoring systems) but other cases where it's not so great (financial systems).
That's why later on we'll be adding per query request consistency levels. For now, we're focused on the AP use case where you don't need an absolute guarantee.
hasn't been measured yet. That'll come in mid-to-late summer after we've released 0.9.1 or 0.9.2.
Also, on the 0.9.2 or 0.9.3 release cycle we're going to start work on a new storage engine that is custom built for this use case. That will have a massive effect on the throughput.
I'll assume that you're asking out of a genuine curiosity born from a lack of knowledge of either system and not a question of "why was this made?"
Hadoop is a computing ecosystem. The Hadoop project is not only a computing framework, but it's a datacenter work scheduler (YARN), a distributed filesystem (HDFS), computing framework (MapReduce), HBase (database built on top of HDFS) and a whole host of other complimentary technologies. Admittedly, when most people say "Hadoop", they refer to MapReduce. MapReduce is a batch computation framework principally for executing filtering or aggregation over large amounts of data (e.g. finding top referrers from request logs).
InfluxDB is a distributed timeseries database. The closest analogue in the Hadoop ecosystem would be HBase running OpenTSDB. InfluxDB is aiming to fill the niche of high-volume metric collection and analysis. A system like InfluxDB (or any other time-series storage solution) aims to observe data over time for use in dashboarding, alerting, and general analysis over time. For example, tracking pageviews per second or response times.
I encourage you to take a look at all these projects, they're fantastic when you need them.
Write means append, to a timeseries which is keyed by a timestamp, and is identified by some name? If a write succeeds only partially, different servers have different data. And a read might return any of these versions. After some time the anti-entropy repair will kick in, and merge the diverging timeseries. Merging means taking the union of all data points.
Where do the timestamps come from, the client? So if a client retries a partially successful write, it'll have the same timestamp and will be merged during repair. Are timestamps within a timeseries monotonically increasing?
The hinted handoff sounds like it is motivated by a similar problem that the Kafka in sync replica set tackles. Do you have any views on the pros/cons of your approach via ISR sets? I think Kafka uses ZK for the ISR management which means it wouldn't work with the availability requirements of InfluxDB, but could a modified version work?
So overall InfluxDB is sacrificing lots of consistency for availability. Since the CP part of the system is actually cached, the entire system is really AP? If not, what parts are not AP? Modification of the CP part, like creation of new timeseries?
From a users perspective I could see it being useful to have a historical part of the timeseries that's guaranteed to be stable, and an in-flux part where the system hasn't settled yet. Then one could run run expensive analytics on the historical part, without having to recalculate everything on the next read since the data could have changed since then. You're already hashing your data and building a Merkle Tree, maybe that would make it possible to implement something like that.